2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.ophistory;
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.awaitility.Awaitility.await;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.never;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
34 import java.time.Instant;
35 import java.util.Properties;
36 import java.util.UUID;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.function.Consumer;
41 import javax.persistence.EntityManagerFactory;
42 import org.junit.After;
43 import org.junit.AfterClass;
44 import org.junit.Before;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47 import org.mockito.Mock;
48 import org.mockito.MockitoAnnotations;
49 import org.onap.policy.controlloop.ControlLoopOperation;
50 import org.onap.policy.controlloop.VirtualControlLoopEvent;
51 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerParams.OperationHistoryDataManagerParamsBuilder;
53 public class OperationHistoryDataManagerImplTest {
55 private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
56 private static final String MY_TARGET = "my-target";
57 private static final String MY_ENTITY = "my-entity";
58 private static final String REQ_ID = "my-request-id";
59 private static final int BATCH_SIZE = 5;
60 private static final int MAX_QUEUE_LENGTH = 23;
62 private static EntityManagerFactory emf;
65 private Thread thread;
67 private OperationHistoryDataManagerParams params;
68 private Consumer<EntityManagerFactory> threadFunction;
69 private VirtualControlLoopEvent event;
70 private ControlLoopOperation operation;
71 private EntityManagerFactory emfSpy;
73 // decremented when the thread function completes
74 private CountDownLatch finished;
76 private OperationHistoryDataManagerImpl mgr;
80 * Sets up for all tests.
83 public static void setUpBeforeClass() {
84 OperationHistoryDataManagerParams params = makeBuilder().build();
86 // capture the entity manager factory for re-use
87 new OperationHistoryDataManagerImpl(params) {
89 protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
90 emf = super.makeEntityManagerFactory(opsHistPu, props);
97 * Restores the environment after all tests.
100 public static void tearDownAfterClass() {
105 * Sets up for an individual test.
108 public void setUp() {
109 MockitoAnnotations.initMocks(this);
111 event = new VirtualControlLoopEvent();
112 event.setRequestId(UUID.randomUUID());
114 operation = new ControlLoopOperation();
115 operation.setTarget(MY_TARGET);
117 threadFunction = null;
118 finished = new CountDownLatch(1);
120 // prevent the "real" emf from being closed
122 doAnswer(ans -> null).when(emfSpy).close();
124 params = makeBuilder().build();
126 mgr = new PseudoThread();
131 public void tearDown() {
136 public void testConstructor() {
137 // use a thread and manager that haven't been started yet
138 thread = mock(Thread.class);
139 mgr = new PseudoThread();
141 // should not start the thread before start() is called
142 verify(thread, never()).start();
146 // should have started the thread
147 verify(thread).start();
149 // invalid properties
151 assertThatCode(() -> new PseudoThread()).isInstanceOf(IllegalArgumentException.class)
152 .hasMessageContaining("data-manager-properties");
156 public void testStart() {
157 // this should have no effect
162 // this should also have no effect
163 assertThatCode(() -> mgr.start()).doesNotThrowAnyException();
167 public void testStore_testStop() throws InterruptedException {
169 mgr.store(REQ_ID, event, MY_ENTITY, operation);
173 assertEquals(1, mgr.getRecordsAdded());
177 * Tests stop() when the manager isn't running.
180 public void testStopNotRunning() {
181 // use a manager that hasn't been started yet
182 mgr = new PseudoThread();
185 verify(emfSpy).close();
189 * Tests store() when it is already stopped.
192 public void testStoreAlreadyStopped() throws InterruptedException {
196 mgr.store(REQ_ID, event, MY_ENTITY, operation);
198 assertEquals(0, mgr.getRecordsAdded());
202 * Tests store() when when the queue is full.
205 public void testStoreTooManyItems() throws InterruptedException {
206 final int nextra = 5;
207 for (int nitems = 0; nitems < MAX_QUEUE_LENGTH + nextra; ++nitems) {
208 mgr.store(REQ_ID, event, MY_ENTITY, operation);
213 assertEquals(MAX_QUEUE_LENGTH, mgr.getRecordsAdded());
217 public void testRun() throws InterruptedException {
219 // trigger thread shutdown when it completes this batch
220 when(emfSpy.createEntityManager()).thenAnswer(ans -> {
222 return emf.createEntityManager();
226 mgr = new RealThread();
229 mgr.store(REQ_ID, event, MY_ENTITY, operation);
230 mgr.store(REQ_ID, event, MY_ENTITY, operation);
231 mgr.store(REQ_ID, event, MY_ENTITY, operation);
235 verify(emfSpy).close();
237 assertEquals(3, mgr.getRecordsAdded());
240 private void waitForThread() {
241 await().atMost(5, TimeUnit.SECONDS).until(() -> !thread.isAlive());
245 * Tests run() when the entity manager throws an exception.
248 public void testRunException() throws InterruptedException {
249 AtomicInteger count = new AtomicInteger(0);
251 when(emfSpy.createEntityManager()).thenAnswer(ans -> {
252 if (count.incrementAndGet() == 2) {
253 // interrupt during one of the attempts
257 // throw an exception for each record
258 throw EXPECTED_EXCEPTION;
262 mgr = new RealThread();
265 mgr.store(REQ_ID, event, MY_ENTITY, operation);
266 mgr.store(REQ_ID, event, MY_ENTITY, operation);
267 mgr.store(REQ_ID, event, MY_ENTITY, operation);
271 verify(emfSpy).close();
275 * Tests storeRemainingRecords() when the entity manager throws an exception.
278 public void testStoreRemainingRecordsException() throws InterruptedException {
279 // arrange to throw an exception
280 when(emfSpy.createEntityManager()).thenThrow(EXPECTED_EXCEPTION);
282 mgr.store(REQ_ID, event, MY_ENTITY, operation);
288 public void testStoreRecord() throws InterruptedException {
290 mgr.store(REQ_ID, event, MY_ENTITY, operation);
293 operation = new ControlLoopOperation(operation);
294 operation.setStart(Instant.now());
295 mgr.store(REQ_ID, event, MY_ENTITY, operation);
297 // both start and end times
298 operation = new ControlLoopOperation(operation);
299 operation.setEnd(Instant.now());
300 mgr.store(REQ_ID, event, MY_ENTITY, operation);
303 operation = new ControlLoopOperation(operation);
304 operation.setStart(null);
305 mgr.store(REQ_ID, event, MY_ENTITY, operation);
309 // all of them should have been stored
310 assertEquals(4, mgr.getRecordsAdded());
313 private void runThread() throws InterruptedException {
314 if (threadFunction == null) {
318 Thread thread2 = new Thread(() -> {
319 threadFunction.accept(emfSpy);
320 finished.countDown();
323 thread2.setDaemon(true);
328 assertTrue(finished.await(5, TimeUnit.SECONDS));
331 private static OperationHistoryDataManagerParamsBuilder makeBuilder() {
333 return OperationHistoryDataManagerParams.builder()
334 .url("jdbc:h2:mem:" + OperationHistoryDataManagerImplTest.class.getSimpleName())
337 .batchSize(BATCH_SIZE)
338 .maxQueueLength(MAX_QUEUE_LENGTH);
343 * Manager that uses the shared DB.
345 private class SharedDb extends OperationHistoryDataManagerImpl {
351 protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
352 // re-use the same factory to avoid re-creating the DB for each test
358 * Manager that uses the shared DB and a pseudo thread.
360 private class PseudoThread extends SharedDb {
363 protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
364 threadFunction = command;
370 * Manager that uses the shared DB and catches the thread.
372 private class RealThread extends SharedDb {
375 protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
376 thread = super.makeThread(emfactory, command);