2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.controlloop.ophistory;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.awaitility.Awaitility.await;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertTrue;
28 import static org.mockito.Mockito.doAnswer;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.never;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
35 import java.time.Instant;
36 import java.util.Properties;
37 import java.util.UUID;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.function.Consumer;
42 import javax.persistence.EntityManagerFactory;
43 import org.junit.After;
44 import org.junit.AfterClass;
45 import org.junit.Before;
46 import org.junit.BeforeClass;
47 import org.junit.Test;
48 import org.junit.runner.RunWith;
49 import org.mockito.Mock;
50 import org.mockito.junit.MockitoJUnitRunner;
51 import org.onap.policy.controlloop.ControlLoopOperation;
52 import org.onap.policy.controlloop.VirtualControlLoopEvent;
53 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerParams.OperationHistoryDataManagerParamsBuilder;
55 @RunWith(MockitoJUnitRunner.class)
56 public class OperationHistoryDataManagerImplTest {
58 private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
59 private static final String MY_LOOP_NAME = "my-loop-name";
60 private static final String MY_ACTOR = "my-actor";
61 private static final String MY_OPERATION = "my-operation";
62 private static final String MY_TARGET = "my-target";
63 private static final String MY_ENTITY = "my-entity";
64 private static final String REQ_ID = "my-request-id";
65 private static final int BATCH_SIZE = 5;
66 private static final int MAX_QUEUE_LENGTH = 23;
68 private static EntityManagerFactory emf;
71 private Thread thread;
73 private OperationHistoryDataManagerParams params;
74 private Consumer<EntityManagerFactory> threadFunction;
75 private VirtualControlLoopEvent event;
76 private ControlLoopOperation operation;
77 private EntityManagerFactory emfSpy;
79 // decremented when the thread function completes
80 private CountDownLatch finished;
82 private OperationHistoryDataManagerImpl mgr;
86 * Sets up for all tests.
89 public static void setUpBeforeClass() {
90 OperationHistoryDataManagerParams params = makeBuilder().build();
92 // capture the entity manager factory for re-use
93 new OperationHistoryDataManagerImpl(params) {
95 protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
96 emf = super.makeEntityManagerFactory(opsHistPu, props);
103 * Restores the environment after all tests.
106 public static void tearDownAfterClass() {
111 * Sets up for an individual test.
114 public void setUp() {
115 event = new VirtualControlLoopEvent();
116 event.setClosedLoopControlName(MY_LOOP_NAME);
117 event.setRequestId(UUID.randomUUID());
119 operation = new ControlLoopOperation();
120 operation.setActor(MY_ACTOR);
121 operation.setOperation(MY_OPERATION);
122 operation.setTarget(MY_TARGET);
123 operation.setSubRequestId(UUID.randomUUID().toString());
125 threadFunction = null;
126 finished = new CountDownLatch(1);
128 // prevent the "real" emf from being closed
130 doAnswer(ans -> null).when(emfSpy).close();
132 params = makeBuilder().build();
134 mgr = new PseudoThread();
139 public void tearDown() {
144 public void testConstructor() {
145 // use a thread and manager that haven't been started yet
146 thread = mock(Thread.class);
147 mgr = new PseudoThread();
149 // should not start the thread before start() is called
150 verify(thread, never()).start();
154 // should have started the thread
155 verify(thread).start();
157 // invalid properties
159 assertThatCode(() -> new PseudoThread()).isInstanceOf(IllegalArgumentException.class)
160 .hasMessageContaining("data-manager-properties");
164 public void testStart() {
165 // this should have no effect
170 // this should also have no effect
171 assertThatCode(() -> mgr.start()).doesNotThrowAnyException();
175 public void testStore_testStop() throws InterruptedException {
177 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
181 assertEquals(1, mgr.getRecordsCommitted());
185 * Tests stop() when the manager isn't running.
188 public void testStopNotRunning() {
189 // use a manager that hasn't been started yet
190 mgr = new PseudoThread();
193 verify(emfSpy).close();
197 * Tests store() when it is already stopped.
200 public void testStoreAlreadyStopped() throws InterruptedException {
204 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
206 assertEquals(0, mgr.getRecordsCommitted());
210 * Tests store() when when the queue is full.
213 public void testStoreTooManyItems() throws InterruptedException {
214 final int nextra = 5;
215 for (int nitems = 0; nitems < MAX_QUEUE_LENGTH + nextra; ++nitems) {
216 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
221 assertEquals(MAX_QUEUE_LENGTH, mgr.getRecordsCommitted());
225 public void testRun() throws InterruptedException {
227 // trigger thread shutdown when it completes this batch
228 when(emfSpy.createEntityManager()).thenAnswer(ans -> {
230 return emf.createEntityManager();
234 mgr = new RealThread();
237 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
238 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
239 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
243 verify(emfSpy).close();
245 assertEquals(3, mgr.getRecordsCommitted());
248 private void waitForThread() {
249 await().atMost(5, TimeUnit.SECONDS).until(() -> !thread.isAlive());
253 * Tests run() when the entity manager throws an exception.
256 public void testRunException() throws InterruptedException {
257 AtomicInteger count = new AtomicInteger(0);
259 when(emfSpy.createEntityManager()).thenAnswer(ans -> {
260 if (count.incrementAndGet() == 2) {
261 // interrupt during one of the attempts
265 // throw an exception for each record
266 throw EXPECTED_EXCEPTION;
270 mgr = new RealThread();
273 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
274 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
275 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
279 verify(emfSpy).close();
283 * Tests storeRemainingRecords() when the entity manager throws an exception.
286 public void testStoreRemainingRecordsException() throws InterruptedException {
287 // arrange to throw an exception
288 when(emfSpy.createEntityManager()).thenThrow(EXPECTED_EXCEPTION);
290 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
296 public void testStoreRecord() throws InterruptedException {
298 * Note: we change sub-request ID each time to guarantee that the records are
303 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
306 operation = new ControlLoopOperation(operation);
307 operation.setSubRequestId(UUID.randomUUID().toString());
308 operation.setStart(Instant.now());
309 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
311 // both start and end times
312 operation = new ControlLoopOperation(operation);
313 operation.setSubRequestId(UUID.randomUUID().toString());
314 operation.setEnd(Instant.now());
315 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
318 operation = new ControlLoopOperation(operation);
319 operation.setSubRequestId(UUID.randomUUID().toString());
320 operation.setStart(null);
321 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
325 // all of them should have been stored
326 assertEquals(4, mgr.getRecordsCommitted());
329 assertEquals(4, mgr.getRecordsInserted());
330 assertEquals(0, mgr.getRecordsUpdated());
334 * Tests storeRecord() when records are updated.
337 public void testStoreRecordUpdate() throws InterruptedException {
339 * Note: we do NOT change sub-request ID, so that records all refer to the same DB
344 operation.setStart(null);
345 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
348 operation = new ControlLoopOperation(operation);
349 operation.setStart(Instant.now());
350 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
352 // both start and end times
353 operation = new ControlLoopOperation(operation);
354 operation.setEnd(Instant.now());
355 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
358 operation = new ControlLoopOperation(operation);
359 operation.setStart(null);
360 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
364 // all of them should have been stored
365 assertEquals(4, mgr.getRecordsCommitted());
367 // only one new record
368 assertEquals(1, mgr.getRecordsInserted());
370 // remainder were updates
371 assertEquals(3, mgr.getRecordsUpdated());
374 private void runThread() throws InterruptedException {
375 if (threadFunction == null) {
379 Thread thread2 = new Thread(() -> {
380 threadFunction.accept(emfSpy);
381 finished.countDown();
384 thread2.setDaemon(true);
389 assertTrue(finished.await(5, TimeUnit.SECONDS));
392 private static OperationHistoryDataManagerParamsBuilder makeBuilder() {
394 return OperationHistoryDataManagerParams.builder()
395 .url("jdbc:h2:mem:" + OperationHistoryDataManagerImplTest.class.getSimpleName())
397 .driver("org.h2.Driver")
400 .batchSize(BATCH_SIZE)
401 .maxQueueLength(MAX_QUEUE_LENGTH);
406 * Manager that uses the shared DB.
408 private class SharedDb extends OperationHistoryDataManagerImpl {
414 protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
415 // re-use the same factory to avoid re-creating the DB for each test
421 * Manager that uses the shared DB and a pseudo thread.
423 private class PseudoThread extends SharedDb {
426 protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
427 threadFunction = command;
433 * Manager that uses the shared DB and catches the thread.
435 private class RealThread extends SharedDb {
438 protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
439 thread = super.makeThread(emfactory, command);