2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.controlloop.ophistory;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.awaitility.Awaitility.await;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertTrue;
28 import static org.mockito.Mockito.doAnswer;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.never;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
35 import jakarta.persistence.EntityManagerFactory;
36 import java.time.Instant;
37 import java.util.Properties;
38 import java.util.UUID;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicInteger;
42 import java.util.function.Consumer;
43 import org.junit.jupiter.api.AfterAll;
44 import org.junit.jupiter.api.AfterEach;
45 import org.junit.jupiter.api.BeforeAll;
46 import org.junit.jupiter.api.BeforeEach;
47 import org.junit.jupiter.api.Test;
48 import org.onap.policy.controlloop.ControlLoopOperation;
49 import org.onap.policy.controlloop.VirtualControlLoopEvent;
50 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerParams.OperationHistoryDataManagerParamsBuilder;
52 class OperationHistoryDataManagerImplTest {
54 private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
55 private static final String MY_LOOP_NAME = "my-loop-name";
56 private static final String MY_ACTOR = "my-actor";
57 private static final String MY_OPERATION = "my-operation";
58 private static final String MY_TARGET = "my-target";
59 private static final String MY_ENTITY = "my-entity";
60 private static final String REQ_ID = "my-request-id";
61 private static final int BATCH_SIZE = 5;
62 private static final int MAX_QUEUE_LENGTH = 23;
64 private static EntityManagerFactory emf;
66 private Thread thread = mock(Thread.class);
68 private OperationHistoryDataManagerParams params;
69 private Consumer<EntityManagerFactory> threadFunction;
70 private VirtualControlLoopEvent event;
71 private ControlLoopOperation operation;
72 private EntityManagerFactory emfSpy;
74 // decremented when the thread function completes
75 private CountDownLatch finished;
77 private OperationHistoryDataManagerImpl mgr;
81 * Sets up for all tests.
84 public static void setUpBeforeClass() {
85 var params = makeBuilder().build();
87 // capture the entity manager factory for re-use
88 new OperationHistoryDataManagerImpl(params) {
90 protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
91 emf = super.makeEntityManagerFactory(opsHistPu, props);
98 * Restores the environment after all tests.
101 public static void tearDownAfterClass() {
106 * Sets up for an individual test.
109 public void setUp() {
110 event = new VirtualControlLoopEvent();
111 event.setClosedLoopControlName(MY_LOOP_NAME);
112 event.setRequestId(UUID.randomUUID());
114 operation = new ControlLoopOperation();
115 operation.setActor(MY_ACTOR);
116 operation.setOperation(MY_OPERATION);
117 operation.setTarget(MY_TARGET);
118 operation.setSubRequestId(UUID.randomUUID().toString());
120 threadFunction = null;
121 finished = new CountDownLatch(1);
123 // prevent the "real" emf from being closed
125 doAnswer(ans -> null).when(emfSpy).close();
127 params = makeBuilder().build();
129 mgr = new PseudoThread();
134 public void tearDown() {
139 void testConstructor() {
140 // use a thread and manager that haven't been started yet
141 thread = mock(Thread.class);
142 mgr = new PseudoThread();
144 // should not start the thread before start() is called
145 verify(thread, never()).start();
149 // should have started the thread
150 verify(thread).start();
152 // invalid properties
154 assertThatCode(() -> new PseudoThread()).isInstanceOf(IllegalArgumentException.class)
155 .hasMessageContaining("data-manager-properties");
160 // this should have no effect
165 // this should also have no effect
166 assertThatCode(() -> mgr.start()).doesNotThrowAnyException();
170 void testStore_testStop() throws InterruptedException {
172 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
176 assertEquals(1, mgr.getRecordsCommitted());
180 * Tests stop() when the manager isn't running.
183 void testStopNotRunning() {
184 // use a manager that hasn't been started yet
185 mgr = new PseudoThread();
188 verify(emfSpy).close();
192 * Tests store() when it is already stopped.
195 void testStoreAlreadyStopped() throws InterruptedException {
199 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
201 assertEquals(0, mgr.getRecordsCommitted());
205 * Tests store() when when the queue is full.
208 void testStoreTooManyItems() throws InterruptedException {
209 final int nextra = 5;
210 for (int nitems = 0; nitems < MAX_QUEUE_LENGTH + nextra; ++nitems) {
211 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
216 assertEquals(MAX_QUEUE_LENGTH, mgr.getRecordsCommitted());
220 void testRun() throws InterruptedException {
222 // trigger thread shutdown when it completes this batch
223 when(emfSpy.createEntityManager()).thenAnswer(ans -> {
225 return emf.createEntityManager();
229 mgr = new RealThread();
232 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
233 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
234 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
238 verify(emfSpy).close();
240 assertEquals(3, mgr.getRecordsCommitted());
243 private void waitForThread() {
244 await().atMost(5, TimeUnit.SECONDS).until(() -> !thread.isAlive());
248 * Tests run() when the entity manager throws an exception.
251 void testRunException() throws InterruptedException {
252 var count = new AtomicInteger(0);
254 when(emfSpy.createEntityManager()).thenAnswer(ans -> {
255 if (count.incrementAndGet() == 2) {
256 // interrupt during one of the attempts
260 // throw an exception for each record
261 throw EXPECTED_EXCEPTION;
265 mgr = new RealThread();
268 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
269 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
270 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
274 verify(emfSpy).close();
278 * Tests storeRemainingRecords() when the entity manager throws an exception.
281 void testStoreRemainingRecordsException() throws InterruptedException {
282 // arrange to throw an exception
283 when(emfSpy.createEntityManager()).thenThrow(EXPECTED_EXCEPTION);
285 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
291 void testStoreRecord() throws InterruptedException {
293 * Note: we change sub-request ID each time to guarantee that the records are
298 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
301 operation = new ControlLoopOperation(operation);
302 operation.setSubRequestId(UUID.randomUUID().toString());
303 operation.setStart(Instant.now());
304 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
306 // both start and end times
307 operation = new ControlLoopOperation(operation);
308 operation.setSubRequestId(UUID.randomUUID().toString());
309 operation.setEnd(Instant.now());
310 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
313 operation = new ControlLoopOperation(operation);
314 operation.setSubRequestId(UUID.randomUUID().toString());
315 operation.setStart(null);
316 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
320 // all of them should have been stored
321 assertEquals(4, mgr.getRecordsCommitted());
324 assertEquals(4, mgr.getRecordsInserted());
325 assertEquals(0, mgr.getRecordsUpdated());
329 * Tests storeRecord() when records are updated.
332 void testStoreRecordUpdate() throws InterruptedException {
334 * Note: we do NOT change sub-request ID, so that records all refer to the same DB
339 operation.setStart(null);
340 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
343 operation = new ControlLoopOperation(operation);
344 operation.setStart(Instant.now());
345 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
347 // both start and end times
348 operation = new ControlLoopOperation(operation);
349 operation.setEnd(Instant.now());
350 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
353 operation = new ControlLoopOperation(operation);
354 operation.setStart(null);
355 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
359 // all of them should have been stored
360 assertEquals(4, mgr.getRecordsCommitted());
362 // only one new record
363 assertEquals(1, mgr.getRecordsInserted());
365 // remainder were updates
366 assertEquals(3, mgr.getRecordsUpdated());
369 private void runThread() throws InterruptedException {
370 if (threadFunction == null) {
374 var thread2 = new Thread(() -> {
375 threadFunction.accept(emfSpy);
376 finished.countDown();
379 thread2.setDaemon(true);
384 assertTrue(finished.await(5, TimeUnit.SECONDS));
387 private static OperationHistoryDataManagerParamsBuilder makeBuilder() {
389 return OperationHistoryDataManagerParams.builder()
390 .url("jdbc:h2:mem:" + OperationHistoryDataManagerImplTest.class.getSimpleName())
392 .driver("org.h2.Driver")
395 .batchSize(BATCH_SIZE)
396 .maxQueueLength(MAX_QUEUE_LENGTH);
401 * Manager that uses the shared DB.
403 private class SharedDb extends OperationHistoryDataManagerImpl {
409 protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
410 // re-use the same factory to avoid re-creating the DB for each test
416 * Manager that uses the shared DB and a pseudo thread.
418 private class PseudoThread extends SharedDb {
421 protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
422 threadFunction = command;
428 * Manager that uses the shared DB and catches the thread.
430 private class RealThread extends SharedDb {
433 protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
434 thread = super.makeThread(emfactory, command);