e6c66d120f9cb1a4a80696ff653638f979fc7cc8
[policy/drools-applications.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.ophistory;
22
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.awaitility.Awaitility.await;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.never;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
33
34 import java.time.Instant;
35 import java.util.Properties;
36 import java.util.UUID;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.function.Consumer;
41 import javax.persistence.EntityManagerFactory;
42 import org.junit.After;
43 import org.junit.AfterClass;
44 import org.junit.Before;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47 import org.mockito.Mock;
48 import org.mockito.MockitoAnnotations;
49 import org.onap.policy.controlloop.ControlLoopOperation;
50 import org.onap.policy.controlloop.VirtualControlLoopEvent;
51 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerParams.OperationHistoryDataManagerParamsBuilder;
52
53 public class OperationHistoryDataManagerImplTest {
54
55     private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
56     private static final String MY_TARGET = "my-target";
57     private static final String MY_ENTITY = "my-entity";
58     private static final String REQ_ID = "my-request-id";
59     private static final int BATCH_SIZE = 5;
60     private static final int MAX_QUEUE_LENGTH = 23;
61
62     private static EntityManagerFactory emf;
63
64     @Mock
65     private Thread thread;
66
67     private OperationHistoryDataManagerParams params;
68     private Consumer<EntityManagerFactory> threadFunction;
69     private VirtualControlLoopEvent event;
70     private ControlLoopOperation operation;
71     private EntityManagerFactory emfSpy;
72
73     // decremented when the thread function completes
74     private CountDownLatch finished;
75
76     private OperationHistoryDataManagerImpl mgr;
77
78
79     /**
80      * Sets up for all tests.
81      */
82     @BeforeClass
83     public static void setUpBeforeClass() {
84         OperationHistoryDataManagerParams params = makeBuilder().build();
85
86         // capture the entity manager factory for re-use
87         new OperationHistoryDataManagerImpl(params) {
88             @Override
89             protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
90                 emf = super.makeEntityManagerFactory(opsHistPu, props);
91                 return emf;
92             }
93         };
94     }
95
96     /**
97      * Restores the environment after all tests.
98      */
99     @AfterClass
100     public static void tearDownAfterClass() {
101         emf.close();
102     }
103
104     /**
105      * Sets up for an individual test.
106      */
107     @Before
108     public void setUp() {
109         MockitoAnnotations.initMocks(this);
110
111         event = new VirtualControlLoopEvent();
112         event.setRequestId(UUID.randomUUID());
113
114         operation = new ControlLoopOperation();
115         operation.setTarget(MY_TARGET);
116
117         threadFunction = null;
118         finished = new CountDownLatch(1);
119
120         // prevent the "real" emf from being closed
121         emfSpy = spy(emf);
122         doAnswer(ans -> null).when(emfSpy).close();
123
124         params = makeBuilder().build();
125
126         mgr = new PseudoThread();
127         mgr.start();
128     }
129
130     @After
131     public void tearDown() {
132         mgr.stop();
133     }
134
135     @Test
136     public void testConstructor() {
137         // use a thread and manager that haven't been started yet
138         thread = mock(Thread.class);
139         mgr = new PseudoThread();
140
141         // should not start the thread before start() is called
142         verify(thread, never()).start();
143
144         mgr.start();
145
146         // should have started the thread
147         verify(thread).start();
148
149         // invalid properties
150         params.setUrl(null);
151         assertThatCode(() -> new PseudoThread()).isInstanceOf(IllegalArgumentException.class)
152                         .hasMessageContaining("data-manager-properties");
153     }
154
155     @Test
156     public void testStart() {
157         // this should have no effect
158         mgr.start();
159
160         mgr.stop();
161
162         // this should also have no effect
163         assertThatCode(() -> mgr.start()).doesNotThrowAnyException();
164     }
165
166     @Test
167     public void testStore_testStop() throws InterruptedException {
168         // store
169         mgr.store(REQ_ID, event, MY_ENTITY, operation);
170
171         runThread();
172
173         assertEquals(1, mgr.getRecordsAdded());
174     }
175
176     /**
177      * Tests stop() when the manager isn't running.
178      */
179     @Test
180     public void testStopNotRunning() {
181         // use a manager that hasn't been started yet
182         mgr = new PseudoThread();
183         mgr.stop();
184
185         verify(emfSpy).close();
186     }
187
188     /**
189      * Tests store() when it is already stopped.
190      */
191     @Test
192     public void testStoreAlreadyStopped() throws InterruptedException {
193         mgr.stop();
194
195         // store
196         mgr.store(REQ_ID, event, MY_ENTITY, operation);
197
198         assertEquals(0, mgr.getRecordsAdded());
199     }
200
201     /**
202      * Tests store() when when the queue is full.
203      */
204     @Test
205     public void testStoreTooManyItems() throws InterruptedException {
206         final int nextra = 5;
207         for (int nitems = 0; nitems < MAX_QUEUE_LENGTH + nextra; ++nitems) {
208             mgr.store(REQ_ID, event, MY_ENTITY, operation);
209         }
210
211         runThread();
212
213         assertEquals(MAX_QUEUE_LENGTH, mgr.getRecordsAdded());
214     }
215
216     @Test
217     public void testRun() throws InterruptedException {
218
219         // trigger thread shutdown when it completes this batch
220         when(emfSpy.createEntityManager()).thenAnswer(ans -> {
221             mgr.stop();
222             return emf.createEntityManager();
223         });
224
225
226         mgr = new RealThread();
227         mgr.start();
228
229         mgr.store(REQ_ID, event, MY_ENTITY, operation);
230         mgr.store(REQ_ID, event, MY_ENTITY, operation);
231         mgr.store(REQ_ID, event, MY_ENTITY, operation);
232
233         waitForThread();
234
235         verify(emfSpy).close();
236
237         assertEquals(3, mgr.getRecordsAdded());
238     }
239
240     private void waitForThread() {
241         await().atMost(5, TimeUnit.SECONDS).until(() -> !thread.isAlive());
242     }
243
244     /**
245      * Tests run() when the entity manager throws an exception.
246      */
247     @Test
248     public void testRunException() throws InterruptedException {
249         AtomicInteger count = new AtomicInteger(0);
250
251         when(emfSpy.createEntityManager()).thenAnswer(ans -> {
252             if (count.incrementAndGet() == 2) {
253                 // interrupt during one of the attempts
254                 thread.interrupt();
255             }
256
257             // throw an exception for each record
258             throw EXPECTED_EXCEPTION;
259         });
260
261
262         mgr = new RealThread();
263         mgr.start();
264
265         mgr.store(REQ_ID, event, MY_ENTITY, operation);
266         mgr.store(REQ_ID, event, MY_ENTITY, operation);
267         mgr.store(REQ_ID, event, MY_ENTITY, operation);
268
269         waitForThread();
270
271         verify(emfSpy).close();
272     }
273
274     /**
275      * Tests storeRemainingRecords() when the entity manager throws an exception.
276      */
277     @Test
278     public void testStoreRemainingRecordsException() throws InterruptedException {
279         // arrange to throw an exception
280         when(emfSpy.createEntityManager()).thenThrow(EXPECTED_EXCEPTION);
281
282         mgr.store(REQ_ID, event, MY_ENTITY, operation);
283
284         runThread();
285     }
286
287     @Test
288     public void testStoreRecord() throws InterruptedException {
289         // no start time
290         mgr.store(REQ_ID, event, MY_ENTITY, operation);
291
292         // no start time
293         operation = new ControlLoopOperation(operation);
294         operation.setStart(Instant.now());
295         mgr.store(REQ_ID, event, MY_ENTITY, operation);
296
297         // both start and end times
298         operation = new ControlLoopOperation(operation);
299         operation.setEnd(Instant.now());
300         mgr.store(REQ_ID, event, MY_ENTITY, operation);
301
302         // only end time
303         operation = new ControlLoopOperation(operation);
304         operation.setStart(null);
305         mgr.store(REQ_ID, event, MY_ENTITY, operation);
306
307         runThread();
308
309         // all of them should have been stored
310         assertEquals(4, mgr.getRecordsAdded());
311     }
312
313     private void runThread() throws InterruptedException {
314         if (threadFunction == null) {
315             return;
316         }
317
318         Thread thread2 = new Thread(() -> {
319             threadFunction.accept(emfSpy);
320             finished.countDown();
321         });
322
323         thread2.setDaemon(true);
324         thread2.start();
325
326         mgr.stop();
327
328         assertTrue(finished.await(5, TimeUnit.SECONDS));
329     }
330
331     private static OperationHistoryDataManagerParamsBuilder makeBuilder() {
332         // @formatter:off
333         return OperationHistoryDataManagerParams.builder()
334                         .url("jdbc:h2:mem:" + OperationHistoryDataManagerImplTest.class.getSimpleName())
335                         .userName("sa")
336                         .password("")
337                         .batchSize(BATCH_SIZE)
338                         .maxQueueLength(MAX_QUEUE_LENGTH);
339         // @formatter:on
340     }
341
342     /**
343      * Manager that uses the shared DB.
344      */
345     private class SharedDb extends OperationHistoryDataManagerImpl {
346         public SharedDb() {
347             super(params);
348         }
349
350         @Override
351         protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
352             // re-use the same factory to avoid re-creating the DB for each test
353             return emfSpy;
354         }
355     }
356
357     /**
358      * Manager that uses the shared DB and a pseudo thread.
359      */
360     private class PseudoThread extends SharedDb {
361
362         @Override
363         protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
364             threadFunction = command;
365             return thread;
366         }
367     }
368
369     /**
370      * Manager that uses the shared DB and catches the thread.
371      */
372     private class RealThread extends SharedDb {
373
374         @Override
375         protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
376             thread = super.makeThread(emfactory, command);
377             return thread;
378         }
379     }
380 }