2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.controlloop.ophistory;
24 import java.util.Date;
25 import java.util.List;
26 import java.util.Properties;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.function.Consumer;
30 import javax.persistence.EntityManager;
31 import javax.persistence.EntityManagerFactory;
32 import javax.persistence.Persistence;
33 import lombok.AllArgsConstructor;
35 import lombok.NoArgsConstructor;
36 import lombok.ToString;
37 import org.onap.policy.common.parameters.ValidationResult;
38 import org.onap.policy.common.utils.jpa.EntityMgrCloser;
39 import org.onap.policy.common.utils.jpa.EntityTransCloser;
40 import org.onap.policy.controlloop.ControlLoopOperation;
41 import org.onap.policy.guard.OperationsHistory;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Data manager that stores records in the DB, asynchronously, using a background thread.
48 public class OperationHistoryDataManagerImpl implements OperationHistoryDataManager {
49 private static final Logger logger = LoggerFactory.getLogger(OperationHistoryDataManagerImpl.class);
52 * Added to the end of {@link #operations} when {@link #stop()} is called. This is
53 * used to get the background thread out of a blocking wait for the next record.
55 private static final Record END_MARKER = new Record();
57 // copied from the parameters
58 private final int maxQueueLength;
59 private final int batchSize;
61 private final EntityManagerFactory emFactory;
64 * Thread that takes records from {@link #operations} and stores them in the DB.
66 private Thread thread;
69 * Set to {@code true} to stop the background thread.
71 private boolean stopped = false;
74 * Queue of operations waiting to be stored in the DB. When {@link #stop()} is called,
75 * an {@link #END_MARKER} is added to the end of the queue.
77 private final BlockingQueue<Record> operations = new LinkedBlockingQueue<>();
80 * Number of records that have been processed and committed into the DB by this data
84 private long recordsCommitted = 0;
87 * Number of records that have been inserted into the DB by this data manager
88 * instance, whether or not they were committed.
91 private long recordsInserted = 0;
94 * Number of records that have been updated within the DB by this data manager
95 * instance, whether or not they were committed.
98 private long recordsUpdated = 0;
101 * Constructs the object.
103 * @param params data manager parameters
105 public OperationHistoryDataManagerImpl(OperationHistoryDataManagerParams params) {
106 ValidationResult result = params.validate("data-manager-properties");
107 if (!result.isValid()) {
108 throw new IllegalArgumentException(result.getResult());
111 this.maxQueueLength = params.getMaxQueueLength();
112 this.batchSize = params.getBatchSize();
114 // create the factory using the properties
115 var props = toProperties(params);
116 this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props);
120 public synchronized void start() {
121 if (stopped || thread != null) {
126 logger.info("start operation history thread");
128 thread = makeThread(emFactory, this::run);
129 thread.setDaemon(true);
134 public synchronized void stop() {
135 logger.info("requesting stop of operation history thread");
139 if (thread == null) {
140 // no thread to close the factory - do it here
144 // the thread will close the factory when it sees the end marker
145 operations.add(END_MARKER);
150 public synchronized void store(String requestId, String clName, Object event, String targetEntity,
151 ControlLoopOperation operation) {
154 logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId,
159 operations.add(new Record(requestId, clName, event, targetEntity, operation));
161 if (operations.size() > maxQueueLength) {
162 Record discarded = operations.remove();
163 logger.warn("too many items to store in the operation history table, discarding {}", discarded);
168 * Takes records from {@link #operations} and stores them in the queue. Continues to
169 * run until {@link #stop()} is invoked, or the thread is interrupted.
171 * @param emfactory entity manager factory
173 private void run(EntityManagerFactory emfactory) {
175 // store records until stopped, continuing if an exception occurs
178 Record triple = operations.take();
179 storeBatch(emfactory.createEntityManager(), triple);
181 } catch (RuntimeException e) {
182 logger.error("failed to save data to operation history table", e);
184 } catch (InterruptedException e) {
185 logger.error("interrupted, discarding remaining operation history data", e);
186 Thread.currentThread().interrupt();
191 storeRemainingRecords(emfactory);
194 synchronized (this) {
203 * Store any remaining records, but stop at the first exception.
205 * @param emfactory entity manager factory
207 private void storeRemainingRecords(EntityManagerFactory emfactory) {
209 while (!operations.isEmpty()) {
210 storeBatch(emfactory.createEntityManager(), operations.poll());
213 } catch (RuntimeException e) {
214 logger.error("failed to save remaining data to operation history table", e);
219 * Stores a batch of records.
221 * @param entityManager entity manager
222 * @param firstRecord first record to be stored
224 private void storeBatch(EntityManager entityManager, Record firstRecord) {
225 logger.info("store operation history record batch");
227 try (var emc = new EntityMgrCloser(entityManager);
228 var trans = new EntityTransCloser(entityManager.getTransaction())) {
231 var rec = firstRecord;
233 while (rec != null && rec != END_MARKER) {
234 storeRecord(entityManager, rec);
236 if (++nrecords >= batchSize) {
240 rec = operations.poll();
244 recordsCommitted += nrecords;
251 * @param entityMgr entity manager
252 * @param rec record to be stored
254 private void storeRecord(EntityManager entityMgr, Record rec) {
256 final String reqId = rec.getRequestId();
257 final String clName = rec.getClName();
258 final ControlLoopOperation operation = rec.getOperation();
260 logger.info("store operation history record for {}", reqId);
262 List<OperationsHistory> results = entityMgr
263 .createQuery("select e from OperationsHistory e" + " where e.closedLoopName= ?1"
264 + " and e.requestId= ?2" + " and e.subrequestId= ?3" + " and e.actor= ?4"
265 + " and e.operation= ?5" + " and e.target= ?6", OperationsHistory.class)
266 .setParameter(1, clName).setParameter(2, rec.getRequestId())
267 .setParameter(3, operation.getSubRequestId()).setParameter(4, operation.getActor())
268 .setParameter(5, operation.getOperation()).setParameter(6, rec.getTargetEntity())
271 if (results.size() > 1) {
272 logger.warn("unexpected operation history record count {} for {}", results.size(), reqId);
275 OperationsHistory entry = (results.isEmpty() ? new OperationsHistory() : results.get(0));
277 entry.setClosedLoopName(clName);
278 entry.setRequestId(rec.getRequestId());
279 entry.setActor(operation.getActor());
280 entry.setOperation(operation.getOperation());
281 entry.setTarget(rec.getTargetEntity());
282 entry.setSubrequestId(operation.getSubRequestId());
283 entry.setMessage(operation.getMessage());
284 entry.setOutcome(operation.getOutcome());
285 if (operation.getStart() != null) {
286 entry.setStarttime(new Date(operation.getStart().toEpochMilli()));
288 entry.setStarttime(null);
290 if (operation.getEnd() != null) {
291 entry.setEndtime(new Date(operation.getEnd().toEpochMilli()));
293 entry.setEndtime(null);
296 if (results.isEmpty()) {
297 logger.info("insert operation history record for {}", reqId);
299 entityMgr.persist(entry);
301 logger.info("update operation history record for {}", reqId);
303 entityMgr.merge(entry);
308 * Converts the parameters to Properties.
310 * @param params parameters to be converted
311 * @return a new property set
313 private Properties toProperties(OperationHistoryDataManagerParams params) {
314 var props = new Properties();
315 props.put("javax.persistence.jdbc.driver", params.getDriver());
316 props.put("javax.persistence.jdbc.url", params.getUrl());
317 props.put("javax.persistence.jdbc.user", params.getUserName());
318 props.put("javax.persistence.jdbc.password", params.getPassword());
319 props.put("hibernate.dialect", params.getDbHibernateDialect());
328 private static class Record {
329 private String requestId;
330 private String clName;
331 private Object event;
332 private String targetEntity;
333 private ControlLoopOperation operation;
336 // the following may be overridden by junit tests
338 protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
339 return Persistence.createEntityManagerFactory(opsHistPu, props);
342 protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
343 return new Thread(() -> command.accept(emfactory));