2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.controlloop.ophistory;
24 import jakarta.persistence.EntityManager;
25 import jakarta.persistence.EntityManagerFactory;
26 import jakarta.persistence.Persistence;
27 import java.util.Date;
28 import java.util.List;
29 import java.util.Properties;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.function.Consumer;
33 import lombok.AllArgsConstructor;
35 import lombok.NoArgsConstructor;
36 import lombok.ToString;
37 import org.onap.policy.common.parameters.ValidationResult;
38 import org.onap.policy.common.utils.jpa.EntityMgrCloser;
39 import org.onap.policy.common.utils.jpa.EntityTransCloser;
40 import org.onap.policy.controlloop.ControlLoopOperation;
41 import org.onap.policy.guard.OperationsHistory;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Data manager that stores records in the DB, asynchronously, using a background thread.
48 public class OperationHistoryDataManagerImpl implements OperationHistoryDataManager {
49 private static final Logger logger = LoggerFactory.getLogger(OperationHistoryDataManagerImpl.class);
52 * Added to the end of {@link #operations} when {@link #stop()} is called. This is
53 * used to get the background thread out of a blocking wait for the next record.
55 private static final Record END_MARKER = new Record();
57 // copied from the parameters
58 private final int maxQueueLength;
59 private final int batchSize;
61 private final EntityManagerFactory emFactory;
64 * Thread that takes records from {@link #operations} and stores them in the DB.
66 private Thread thread;
69 * Set to {@code true} to stop the background thread.
71 private boolean stopped = false;
74 * Queue of operations waiting to be stored in the DB. When {@link #stop()} is called,
75 * an {@link #END_MARKER} is added to the end of the queue.
77 private final BlockingQueue<Record> operations = new LinkedBlockingQueue<>();
80 * Number of records that have been processed and committed into the DB by this data
84 private long recordsCommitted = 0;
87 * Number of records that have been inserted into the DB by this data manager
88 * instance, whether or not they were committed.
91 private long recordsInserted = 0;
94 * Number of records that have been updated within the DB by this data manager
95 * instance, whether or not they were committed.
98 private long recordsUpdated = 0;
101 * Constructs the object.
103 * @param params data manager parameters
105 public OperationHistoryDataManagerImpl(OperationHistoryDataManagerParams params) {
106 ValidationResult result = params.validate("data-manager-properties");
107 if (!result.isValid()) {
108 throw new IllegalArgumentException(result.getResult());
111 this.maxQueueLength = params.getMaxQueueLength();
112 this.batchSize = params.getBatchSize();
114 // create the factory using the properties
115 var props = toProperties(params);
116 this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props);
120 public synchronized void start() {
121 if (stopped || thread != null) {
126 logger.info("start operation history thread");
128 thread = makeThread(emFactory, this::run);
129 thread.setDaemon(true);
134 public synchronized void stop() {
135 logger.info("requesting stop of operation history thread");
139 if (thread == null) {
140 // no thread to close the factory - do it here
144 // the thread will close the factory when it sees the end marker
145 operations.add(END_MARKER);
150 public synchronized void store(String requestId, String clName, Object event, String targetEntity,
151 ControlLoopOperation operation) {
154 logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId,
159 operations.add(new Record(requestId, clName, event, targetEntity, operation));
161 if (operations.size() > maxQueueLength) {
162 Record discarded = operations.remove();
163 logger.warn("too many items to store in the operation history table, discarding {}", discarded);
168 * Takes records from {@link #operations} and stores them in the queue. Continues to
169 * run until {@link #stop()} is invoked, or the thread is interrupted.
171 * @param factory entity manager factory
173 private void run(EntityManagerFactory factory) {
175 // store records until stopped, continuing if an exception occurs
178 Record triple = operations.take();
179 storeBatch(factory.createEntityManager(), triple);
181 } catch (RuntimeException e) {
182 logger.error("failed to save data to operation history table", e);
184 } catch (InterruptedException e) {
185 logger.error("interrupted, discarding remaining operation history data", e);
186 Thread.currentThread().interrupt();
191 storeRemainingRecords(factory);
194 synchronized (this) {
202 * Store any remaining records, but stop at the first exception.
204 * @param factory entity manager factory
206 private void storeRemainingRecords(EntityManagerFactory factory) {
208 while (!operations.isEmpty()) {
209 try (var em = factory.createEntityManager()) {
210 storeBatch(em, operations.poll());
214 } catch (RuntimeException e) {
215 logger.error("failed to save remaining data to operation history table", e);
220 * Stores a batch of records.
222 * @param entityManager entity manager
223 * @param firstRecord first record to be stored
225 private void storeBatch(EntityManager entityManager, Record firstRecord) {
226 logger.info("store operation history record batch");
228 try (var ignored = new EntityMgrCloser(entityManager);
229 var trans = new EntityTransCloser(entityManager.getTransaction())) {
232 var rec = firstRecord;
234 while (rec != null && rec != END_MARKER) {
235 storeRecord(entityManager, rec);
237 if (++nrecords >= batchSize) {
241 rec = operations.poll();
245 recordsCommitted += nrecords;
252 * @param entityMgr entity manager
253 * @param rec record to be stored
255 private void storeRecord(EntityManager entityMgr, Record rec) {
257 final String reqId = rec.getRequestId();
258 final String clName = rec.getClName();
259 final ControlLoopOperation operation = rec.getOperation();
261 logger.info("store operation history record for {}", reqId);
263 List<OperationsHistory> results = entityMgr
264 .createQuery("select e from OperationsHistory e" + " where e.closedLoopName= ?1"
265 + " and e.requestId= ?2" + " and e.subrequestId= ?3" + " and e.actor= ?4"
266 + " and e.operation= ?5" + " and e.target= ?6", OperationsHistory.class)
267 .setParameter(1, clName).setParameter(2, rec.getRequestId())
268 .setParameter(3, operation.getSubRequestId()).setParameter(4, operation.getActor())
269 .setParameter(5, operation.getOperation()).setParameter(6, rec.getTargetEntity())
272 if (results.size() > 1) {
273 logger.warn("unexpected operation history record count {} for {}", results.size(), reqId);
276 OperationsHistory entry = (results.isEmpty() ? new OperationsHistory() : results.get(0));
278 entry.setClosedLoopName(clName);
279 entry.setRequestId(rec.getRequestId());
280 entry.setActor(operation.getActor());
281 entry.setOperation(operation.getOperation());
282 entry.setTarget(rec.getTargetEntity());
283 entry.setSubrequestId(operation.getSubRequestId());
284 entry.setMessage(operation.getMessage());
285 entry.setOutcome(operation.getOutcome());
286 if (operation.getStart() != null) {
287 entry.setStarttime(new Date(operation.getStart().toEpochMilli()));
289 entry.setStarttime(null);
291 if (operation.getEnd() != null) {
292 entry.setEndtime(new Date(operation.getEnd().toEpochMilli()));
294 entry.setEndtime(null);
297 if (results.isEmpty()) {
298 logger.info("insert operation history record for {}", reqId);
300 entityMgr.persist(entry);
302 logger.info("update operation history record for {}", reqId);
304 entityMgr.merge(entry);
309 * Converts the parameters to Properties.
311 * @param params parameters to be converted
312 * @return a new property set
314 private Properties toProperties(OperationHistoryDataManagerParams params) {
315 var props = new Properties();
316 props.put("jakarta.persistence.jdbc.driver", params.getDriver());
317 props.put("jakarta.persistence.jdbc.url", params.getUrl());
318 props.put("jakarta.persistence.jdbc.user", params.getUserName());
319 props.put("jakarta.persistence.jdbc.password", params.getPassword());
320 props.put("hibernate.dialect", params.getDbHibernateDialect());
329 private static class Record {
330 private String requestId;
331 private String clName;
332 private Object event;
333 private String targetEntity;
334 private ControlLoopOperation operation;
337 // the following may be overridden by junit tests
339 protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
340 return Persistence.createEntityManagerFactory(opsHistPu, props);
343 protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
344 return new Thread(() -> command.accept(emfactory));