2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.ophistory;
23 import java.util.Date;
24 import java.util.List;
25 import java.util.Properties;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.function.Consumer;
29 import javax.persistence.EntityManager;
30 import javax.persistence.EntityManagerFactory;
31 import javax.persistence.Persistence;
32 import lombok.AllArgsConstructor;
34 import lombok.NoArgsConstructor;
35 import lombok.ToString;
36 import org.eclipse.persistence.config.PersistenceUnitProperties;
37 import org.onap.policy.common.parameters.ValidationResult;
38 import org.onap.policy.common.utils.jpa.EntityMgrCloser;
39 import org.onap.policy.common.utils.jpa.EntityTransCloser;
40 import org.onap.policy.controlloop.ControlLoopOperation;
41 import org.onap.policy.guard.OperationsHistory;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Data manager that stores records in the DB, asynchronously, using a background thread.
48 public class OperationHistoryDataManagerImpl implements OperationHistoryDataManager {
49 private static final Logger logger = LoggerFactory.getLogger(OperationHistoryDataManagerImpl.class);
52 * Added to the end of {@link #operations} when {@link #stop()} is called. This is
53 * used to get the background thread out of a blocking wait for the next record.
55 private static final Record END_MARKER = new Record();
57 // copied from the parameters
58 private final int maxQueueLength;
59 private final int batchSize;
61 private final EntityManagerFactory emFactory;
64 * Thread that takes records from {@link #operations} and stores them in the DB.
66 private Thread thread;
69 * Set to {@code true} to stop the background thread.
71 private boolean stopped = false;
74 * Queue of operations waiting to be stored in the DB. When {@link #stop()} is called,
75 * an {@link #END_MARKER} is added to the end of the queue.
77 private final BlockingQueue<Record> operations = new LinkedBlockingQueue<>();
80 * Number of records that have been processed and committed into the DB by this data
84 private long recordsCommitted = 0;
87 * Number of records that have been inserted into the DB by this data manager
88 * instance, whether or not they were committed.
91 private long recordsInserted = 0;
94 * Number of records that have been updated within the DB by this data manager
95 * instance, whether or not they were committed.
98 private long recordsUpdated = 0;
102 * Constructs the object.
104 * @param params data manager parameters
106 public OperationHistoryDataManagerImpl(OperationHistoryDataManagerParams params) {
107 ValidationResult result = params.validate("data-manager-properties");
108 if (!result.isValid()) {
109 throw new IllegalArgumentException(result.getResult());
112 this.maxQueueLength = params.getMaxQueueLength();
113 this.batchSize = params.getBatchSize();
115 // create the factory using the properties
116 var props = toProperties(params);
117 this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props);
121 public synchronized void start() {
122 if (stopped || thread != null) {
127 logger.info("start operation history thread");
129 thread = makeThread(emFactory, this::run);
130 thread.setDaemon(true);
135 public synchronized void stop() {
136 logger.info("requesting stop of operation history thread");
140 if (thread == null) {
141 // no thread to close the factory - do it here
145 // the thread will close the factory when it sees the end marker
146 operations.add(END_MARKER);
151 public synchronized void store(String requestId, String clName, Object event, String targetEntity,
152 ControlLoopOperation operation) {
155 logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId,
160 operations.add(new Record(requestId, clName, event, targetEntity, operation));
162 if (operations.size() > maxQueueLength) {
163 Record discarded = operations.remove();
164 logger.warn("too many items to store in the operation history table, discarding {}", discarded);
169 * Takes records from {@link #operations} and stores them in the queue. Continues to
170 * run until {@link #stop()} is invoked, or the thread is interrupted.
172 * @param emfactory entity manager factory
174 private void run(EntityManagerFactory emfactory) {
176 // store records until stopped, continuing if an exception occurs
179 Record triple = operations.take();
180 storeBatch(emfactory.createEntityManager(), triple);
182 } catch (RuntimeException e) {
183 logger.error("failed to save data to operation history table", e);
185 } catch (InterruptedException e) {
186 logger.error("interrupted, discarding remaining operation history data", e);
187 Thread.currentThread().interrupt();
192 storeRemainingRecords(emfactory);
195 synchronized (this) {
204 * Store any remaining records, but stop at the first exception.
206 * @param emfactory entity manager factory
208 private void storeRemainingRecords(EntityManagerFactory emfactory) {
210 while (!operations.isEmpty()) {
211 storeBatch(emfactory.createEntityManager(), operations.poll());
214 } catch (RuntimeException e) {
215 logger.error("failed to save remaining data to operation history table", e);
220 * Stores a batch of records.
222 * @param entityManager entity manager
223 * @param firstRecord first record to be stored
225 private void storeBatch(EntityManager entityManager, Record firstRecord) {
226 logger.info("store operation history record batch");
228 try (var emc = new EntityMgrCloser(entityManager);
229 var trans = new EntityTransCloser(entityManager.getTransaction())) {
232 var rec = firstRecord;
234 while (rec != null && rec != END_MARKER) {
235 storeRecord(entityManager, rec);
237 if (++nrecords >= batchSize) {
241 rec = operations.poll();
245 recordsCommitted += nrecords;
252 * @param entityMgr entity manager
253 * @param rec record to be stored
255 private void storeRecord(EntityManager entityMgr, Record rec) {
257 final String reqId = rec.getRequestId();
258 final String clName = rec.getClName();
259 final ControlLoopOperation operation = rec.getOperation();
261 logger.info("store operation history record for {}", reqId);
263 List<OperationsHistory> results = entityMgr
264 .createQuery("select e from OperationsHistory e" + " where e.closedLoopName= ?1"
265 + " and e.requestId= ?2" + " and e.subrequestId= ?3" + " and e.actor= ?4"
266 + " and e.operation= ?5" + " and e.target= ?6", OperationsHistory.class)
267 .setParameter(1, clName).setParameter(2, rec.getRequestId())
268 .setParameter(3, operation.getSubRequestId()).setParameter(4, operation.getActor())
269 .setParameter(5, operation.getOperation()).setParameter(6, rec.getTargetEntity())
272 if (results.size() > 1) {
273 logger.warn("unexpected operation history record count {} for {}", results.size(), reqId);
276 OperationsHistory entry = (results.isEmpty() ? new OperationsHistory() : results.get(0));
278 entry.setClosedLoopName(clName);
279 entry.setRequestId(rec.getRequestId());
280 entry.setActor(operation.getActor());
281 entry.setOperation(operation.getOperation());
282 entry.setTarget(rec.getTargetEntity());
283 entry.setSubrequestId(operation.getSubRequestId());
284 entry.setMessage(operation.getMessage());
285 entry.setOutcome(operation.getOutcome());
286 if (operation.getStart() != null) {
287 entry.setStarttime(new Date(operation.getStart().toEpochMilli()));
289 entry.setStarttime(null);
291 if (operation.getEnd() != null) {
292 entry.setEndtime(new Date(operation.getEnd().toEpochMilli()));
294 entry.setEndtime(null);
297 if (results.isEmpty()) {
298 logger.info("insert operation history record for {}", reqId);
300 entityMgr.persist(entry);
302 logger.info("update operation history record for {}", reqId);
304 entityMgr.merge(entry);
309 * Converts the parameters to Properties.
311 * @param params parameters to be converted
312 * @return a new property set
314 private Properties toProperties(OperationHistoryDataManagerParams params) {
315 var props = new Properties();
316 props.put(PersistenceUnitProperties.JDBC_DRIVER, params.getDriver());
317 props.put(PersistenceUnitProperties.JDBC_URL, params.getUrl());
318 props.put(PersistenceUnitProperties.JDBC_USER, params.getUserName());
319 props.put(PersistenceUnitProperties.JDBC_PASSWORD, params.getPassword());
320 props.put(PersistenceUnitProperties.TARGET_DATABASE, params.getDbType());
321 props.put(PersistenceUnitProperties.CLASSLOADER, getClass().getClassLoader());
330 private static class Record {
331 private String requestId;
332 private String clName;
333 private Object event;
334 private String targetEntity;
335 private ControlLoopOperation operation;
338 // the following may be overridden by junit tests
340 protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
341 return Persistence.createEntityManagerFactory(opsHistPu, props);
344 protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
345 return new Thread(() -> command.accept(emfactory));