2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.ophistory;
23 import java.util.Date;
24 import java.util.Properties;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.function.Consumer;
28 import javax.persistence.EntityManager;
29 import javax.persistence.EntityManagerFactory;
30 import javax.persistence.Persistence;
31 import lombok.AllArgsConstructor;
33 import lombok.NoArgsConstructor;
34 import lombok.ToString;
35 import org.eclipse.persistence.config.PersistenceUnitProperties;
36 import org.onap.policy.common.parameters.ValidationResult;
37 import org.onap.policy.common.utils.jpa.EntityMgrCloser;
38 import org.onap.policy.common.utils.jpa.EntityTransCloser;
39 import org.onap.policy.controlloop.ControlLoopOperation;
40 import org.onap.policy.controlloop.VirtualControlLoopEvent;
41 import org.onap.policy.database.operationshistory.Dbao;
42 import org.onap.policy.guard.Util;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * Data manager that stores records in the DB, asynchronously, using a background thread.
49 public class OperationHistoryDataManagerImpl implements OperationHistoryDataManager {
50 private static final Logger logger = LoggerFactory.getLogger(OperationHistoryDataManagerImpl.class);
53 * Added to the end of {@link #operations} when {@link #stop()} is called. This is
54 * used to get the background thread out of a blocking wait for the next record.
56 private static final Record END_MARKER = new Record();
58 // copied from the parameters
59 private final int maxQueueLength;
60 private final int batchSize;
62 private final EntityManagerFactory emFactory;
65 * Thread that takes records from {@link #operations} and stores them in the DB.
67 private Thread thread;
70 * Set to {@code true} to stop the background thread.
72 private boolean stopped = false;
75 * Queue of operations waiting to be stored in the DB. When {@link #stop()} is called,
76 * an {@link #END_MARKER} is added to the end of the queue.
78 private final BlockingQueue<Record> operations = new LinkedBlockingQueue<>();
81 * Number of records that have been added to the DB by this data manager instance.
84 private long recordsAdded = 0;
88 * Constructs the object.
90 * @param params data manager parameters
92 public OperationHistoryDataManagerImpl(OperationHistoryDataManagerParams params) {
93 ValidationResult result = params.validate("data-manager-properties");
94 if (!result.isValid()) {
95 throw new IllegalArgumentException(result.getResult());
98 this.maxQueueLength = params.getMaxQueueLength();
99 this.batchSize = params.getBatchSize();
101 // create the factory using the properties
102 Properties props = toProperties(params);
103 this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props);
107 public synchronized void start() {
108 if (stopped || thread != null) {
113 thread = makeThread(emFactory, this::run);
114 thread.setDaemon(true);
119 public synchronized void stop() {
122 if (thread == null) {
123 // no thread to close the factory - do it here
127 // the thread will close the factory when it sees the end marker
128 operations.add(END_MARKER);
133 public synchronized void store(String requestId, VirtualControlLoopEvent event, ControlLoopOperation operation) {
136 logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId,
141 operations.add(new Record(requestId, event, operation));
143 if (operations.size() > maxQueueLength) {
144 Record discarded = operations.remove();
145 logger.warn("too many items to store in the operation history table, discarding {}", discarded);
150 * Takes records from {@link #operations} and stores them in the queue. Continues to
151 * run until {@link #stop()} is invoked, or the thread is interrupted.
153 * @param emfactory entity manager factory
155 private void run(EntityManagerFactory emfactory) {
157 // store records until stopped, continuing if an exception occurs
160 Record triple = operations.take();
161 storeBatch(emfactory.createEntityManager(), triple);
163 } catch (RuntimeException e) {
164 logger.error("failed to save data to operation history table", e);
166 } catch (InterruptedException e) {
167 logger.error("interrupted, discarding remaining operation history data", e);
168 Thread.currentThread().interrupt();
173 storeRemainingRecords(emfactory);
176 synchronized (this) {
185 * Store any remaining records, but stop at the first exception.
187 * @param emfactory entity manager factory
189 private void storeRemainingRecords(EntityManagerFactory emfactory) {
191 while (!operations.isEmpty()) {
192 storeBatch(emfactory.createEntityManager(), operations.poll());
195 } catch (RuntimeException e) {
196 logger.error("failed to save remaining data to operation history table", e);
201 * Stores a batch of records.
203 * @param entityManager entity manager
204 * @param firstRecord first record to be stored
206 private void storeBatch(EntityManager entityManager, Record firstRecord) {
208 try (EntityMgrCloser emc = new EntityMgrCloser(entityManager);
209 EntityTransCloser trans = new EntityTransCloser(entityManager.getTransaction())) {
212 Record record = firstRecord;
214 while (record != null && record != END_MARKER) {
215 storeRecord(entityManager, record);
217 if (++nrecords >= batchSize) {
221 record = operations.poll();
225 recordsAdded += nrecords;
232 * @param entityManager entity manager
233 * @param record record to be stored
235 private void storeRecord(EntityManager entityMgr, Record record) {
237 Dbao newEntry = new Dbao();
239 final VirtualControlLoopEvent event = record.getEvent();
240 final ControlLoopOperation operation = record.getOperation();
242 newEntry.setClosedLoopName(event.getClosedLoopControlName());
243 newEntry.setRequestId(record.getRequestId());
244 newEntry.setActor(operation.getActor());
245 newEntry.setOperation(operation.getOperation());
246 newEntry.setTarget(operation.getTarget());
247 newEntry.setSubrequestId(operation.getSubRequestId());
248 newEntry.setMessage(operation.getMessage());
249 newEntry.setOutcome(operation.getOutcome());
250 if (operation.getStart() != null) {
251 newEntry.setStarttime(new Date(operation.getStart().toEpochMilli()));
253 if (operation.getEnd() != null) {
254 newEntry.setEndtime(new Date(operation.getEnd().toEpochMilli()));
257 entityMgr.persist(newEntry);
261 * Converts the parameters to Properties.
263 * @param params parameters to be converted
264 * @return a new property set
266 private Properties toProperties(OperationHistoryDataManagerParams params) {
267 Properties props = new Properties();
268 props.put(Util.ECLIPSE_LINK_KEY_URL, params.getUrl());
269 props.put(Util.ECLIPSE_LINK_KEY_USER, params.getUserName());
270 props.put(Util.ECLIPSE_LINK_KEY_PASS, params.getPassword());
271 props.put(PersistenceUnitProperties.CLASSLOADER, getClass().getClassLoader());
280 private static class Record {
281 private String requestId;
282 private VirtualControlLoopEvent event;
283 private ControlLoopOperation operation;
286 // the following may be overridden by junit tests
288 protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
289 return Persistence.createEntityManagerFactory(opsHistPu, props);
292 protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
293 return new Thread(() -> command.accept(emfactory));