2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.ophistory;
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.awaitility.Awaitility.await;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.never;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
34 import java.time.Instant;
35 import java.util.Properties;
36 import java.util.UUID;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.function.Consumer;
41 import javax.persistence.EntityManagerFactory;
42 import org.junit.After;
43 import org.junit.AfterClass;
44 import org.junit.Before;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47 import org.junit.runner.RunWith;
48 import org.mockito.Mock;
49 import org.mockito.junit.MockitoJUnitRunner;
50 import org.onap.policy.controlloop.ControlLoopOperation;
51 import org.onap.policy.controlloop.VirtualControlLoopEvent;
52 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerParams.OperationHistoryDataManagerParamsBuilder;
54 @RunWith(MockitoJUnitRunner.class)
55 public class OperationHistoryDataManagerImplTest {
57 private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
58 private static final String MY_LOOP_NAME = "my-loop-name";
59 private static final String MY_ACTOR = "my-actor";
60 private static final String MY_OPERATION = "my-operation";
61 private static final String MY_TARGET = "my-target";
62 private static final String MY_ENTITY = "my-entity";
63 private static final String REQ_ID = "my-request-id";
64 private static final int BATCH_SIZE = 5;
65 private static final int MAX_QUEUE_LENGTH = 23;
67 private static EntityManagerFactory emf;
70 private Thread thread;
72 private OperationHistoryDataManagerParams params;
73 private Consumer<EntityManagerFactory> threadFunction;
74 private VirtualControlLoopEvent event;
75 private ControlLoopOperation operation;
76 private EntityManagerFactory emfSpy;
78 // decremented when the thread function completes
79 private CountDownLatch finished;
81 private OperationHistoryDataManagerImpl mgr;
85 * Sets up for all tests.
88 public static void setUpBeforeClass() {
89 OperationHistoryDataManagerParams params = makeBuilder().build();
91 // capture the entity manager factory for re-use
92 new OperationHistoryDataManagerImpl(params) {
94 protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
95 emf = super.makeEntityManagerFactory(opsHistPu, props);
102 * Restores the environment after all tests.
105 public static void tearDownAfterClass() {
110 * Sets up for an individual test.
113 public void setUp() {
114 event = new VirtualControlLoopEvent();
115 event.setClosedLoopControlName(MY_LOOP_NAME);
116 event.setRequestId(UUID.randomUUID());
118 operation = new ControlLoopOperation();
119 operation.setActor(MY_ACTOR);
120 operation.setOperation(MY_OPERATION);
121 operation.setTarget(MY_TARGET);
122 operation.setSubRequestId(UUID.randomUUID().toString());
124 threadFunction = null;
125 finished = new CountDownLatch(1);
127 // prevent the "real" emf from being closed
129 doAnswer(ans -> null).when(emfSpy).close();
131 params = makeBuilder().build();
133 mgr = new PseudoThread();
138 public void tearDown() {
143 public void testConstructor() {
144 // use a thread and manager that haven't been started yet
145 thread = mock(Thread.class);
146 mgr = new PseudoThread();
148 // should not start the thread before start() is called
149 verify(thread, never()).start();
153 // should have started the thread
154 verify(thread).start();
156 // invalid properties
158 assertThatCode(() -> new PseudoThread()).isInstanceOf(IllegalArgumentException.class)
159 .hasMessageContaining("data-manager-properties");
163 public void testStart() {
164 // this should have no effect
169 // this should also have no effect
170 assertThatCode(() -> mgr.start()).doesNotThrowAnyException();
174 public void testStore_testStop() throws InterruptedException {
176 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
180 assertEquals(1, mgr.getRecordsCommitted());
184 * Tests stop() when the manager isn't running.
187 public void testStopNotRunning() {
188 // use a manager that hasn't been started yet
189 mgr = new PseudoThread();
192 verify(emfSpy).close();
196 * Tests store() when it is already stopped.
199 public void testStoreAlreadyStopped() throws InterruptedException {
203 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
205 assertEquals(0, mgr.getRecordsCommitted());
209 * Tests store() when when the queue is full.
212 public void testStoreTooManyItems() throws InterruptedException {
213 final int nextra = 5;
214 for (int nitems = 0; nitems < MAX_QUEUE_LENGTH + nextra; ++nitems) {
215 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
220 assertEquals(MAX_QUEUE_LENGTH, mgr.getRecordsCommitted());
224 public void testRun() throws InterruptedException {
226 // trigger thread shutdown when it completes this batch
227 when(emfSpy.createEntityManager()).thenAnswer(ans -> {
229 return emf.createEntityManager();
233 mgr = new RealThread();
236 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
237 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
238 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
242 verify(emfSpy).close();
244 assertEquals(3, mgr.getRecordsCommitted());
247 private void waitForThread() {
248 await().atMost(5, TimeUnit.SECONDS).until(() -> !thread.isAlive());
252 * Tests run() when the entity manager throws an exception.
255 public void testRunException() throws InterruptedException {
256 AtomicInteger count = new AtomicInteger(0);
258 when(emfSpy.createEntityManager()).thenAnswer(ans -> {
259 if (count.incrementAndGet() == 2) {
260 // interrupt during one of the attempts
264 // throw an exception for each record
265 throw EXPECTED_EXCEPTION;
269 mgr = new RealThread();
272 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
273 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
274 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
278 verify(emfSpy).close();
282 * Tests storeRemainingRecords() when the entity manager throws an exception.
285 public void testStoreRemainingRecordsException() throws InterruptedException {
286 // arrange to throw an exception
287 when(emfSpy.createEntityManager()).thenThrow(EXPECTED_EXCEPTION);
289 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
295 public void testStoreRecord() throws InterruptedException {
297 * Note: we change sub-request ID each time to guarantee that the records are
302 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
305 operation = new ControlLoopOperation(operation);
306 operation.setSubRequestId(UUID.randomUUID().toString());
307 operation.setStart(Instant.now());
308 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
310 // both start and end times
311 operation = new ControlLoopOperation(operation);
312 operation.setSubRequestId(UUID.randomUUID().toString());
313 operation.setEnd(Instant.now());
314 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
317 operation = new ControlLoopOperation(operation);
318 operation.setSubRequestId(UUID.randomUUID().toString());
319 operation.setStart(null);
320 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
324 // all of them should have been stored
325 assertEquals(4, mgr.getRecordsCommitted());
328 assertEquals(4, mgr.getRecordsInserted());
329 assertEquals(0, mgr.getRecordsUpdated());
333 * Tests storeRecord() when records are updated.
336 public void testStoreRecordUpdate() throws InterruptedException {
338 * Note: we do NOT change sub-request ID, so that records all refer to the same DB
343 operation.setStart(null);
344 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
347 operation = new ControlLoopOperation(operation);
348 operation.setStart(Instant.now());
349 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
351 // both start and end times
352 operation = new ControlLoopOperation(operation);
353 operation.setEnd(Instant.now());
354 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
357 operation = new ControlLoopOperation(operation);
358 operation.setStart(null);
359 mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
363 // all of them should have been stored
364 assertEquals(4, mgr.getRecordsCommitted());
366 // only one new record
367 assertEquals(1, mgr.getRecordsInserted());
369 // remainder were updates
370 assertEquals(3, mgr.getRecordsUpdated());
373 private void runThread() throws InterruptedException {
374 if (threadFunction == null) {
378 Thread thread2 = new Thread(() -> {
379 threadFunction.accept(emfSpy);
380 finished.countDown();
383 thread2.setDaemon(true);
388 assertTrue(finished.await(5, TimeUnit.SECONDS));
391 private static OperationHistoryDataManagerParamsBuilder makeBuilder() {
393 return OperationHistoryDataManagerParams.builder()
394 .url("jdbc:h2:mem:" + OperationHistoryDataManagerImplTest.class.getSimpleName())
397 .batchSize(BATCH_SIZE)
398 .maxQueueLength(MAX_QUEUE_LENGTH);
403 * Manager that uses the shared DB.
405 private class SharedDb extends OperationHistoryDataManagerImpl {
411 protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
412 // re-use the same factory to avoid re-creating the DB for each test
418 * Manager that uses the shared DB and a pseudo thread.
420 private class PseudoThread extends SharedDb {
423 protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
424 threadFunction = command;
430 * Manager that uses the shared DB and catches the thread.
432 private class RealThread extends SharedDb {
435 protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
436 thread = super.makeThread(emfactory, command);