2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.ophistory;
23 import java.util.Date;
24 import java.util.List;
25 import java.util.Properties;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.function.Consumer;
29 import javax.persistence.EntityManager;
30 import javax.persistence.EntityManagerFactory;
31 import javax.persistence.Persistence;
32 import lombok.AllArgsConstructor;
34 import lombok.NoArgsConstructor;
35 import lombok.ToString;
36 import org.eclipse.persistence.config.PersistenceUnitProperties;
37 import org.onap.policy.common.parameters.ValidationResult;
38 import org.onap.policy.common.utils.jpa.EntityMgrCloser;
39 import org.onap.policy.common.utils.jpa.EntityTransCloser;
40 import org.onap.policy.controlloop.ControlLoopOperation;
41 import org.onap.policy.controlloop.VirtualControlLoopEvent;
42 import org.onap.policy.guard.OperationsHistory;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * Data manager that stores records in the DB, asynchronously, using a background thread.
49 public class OperationHistoryDataManagerImpl implements OperationHistoryDataManager {
50 private static final Logger logger = LoggerFactory.getLogger(OperationHistoryDataManagerImpl.class);
53 * Added to the end of {@link #operations} when {@link #stop()} is called. This is
54 * used to get the background thread out of a blocking wait for the next record.
56 private static final Record END_MARKER = new Record();
58 // copied from the parameters
59 private final int maxQueueLength;
60 private final int batchSize;
62 private final EntityManagerFactory emFactory;
65 * Thread that takes records from {@link #operations} and stores them in the DB.
67 private Thread thread;
70 * Set to {@code true} to stop the background thread.
72 private boolean stopped = false;
75 * Queue of operations waiting to be stored in the DB. When {@link #stop()} is called,
76 * an {@link #END_MARKER} is added to the end of the queue.
78 private final BlockingQueue<Record> operations = new LinkedBlockingQueue<>();
81 * Number of records that have been processed and committed into the DB by this data
85 private long recordsCommitted = 0;
88 * Number of records that have been inserted into the DB by this data manager
89 * instance, whether or not they were committed.
92 private long recordsInserted = 0;
95 * Number of records that have been updated within the DB by this data manager
96 * instance, whether or not they were committed.
99 private long recordsUpdated = 0;
103 * Constructs the object.
105 * @param params data manager parameters
107 public OperationHistoryDataManagerImpl(OperationHistoryDataManagerParams params) {
108 ValidationResult result = params.validate("data-manager-properties");
109 if (!result.isValid()) {
110 throw new IllegalArgumentException(result.getResult());
113 this.maxQueueLength = params.getMaxQueueLength();
114 this.batchSize = params.getBatchSize();
116 // create the factory using the properties
117 Properties props = toProperties(params);
118 this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props);
122 public synchronized void start() {
123 if (stopped || thread != null) {
128 logger.info("start operation history thread");
130 thread = makeThread(emFactory, this::run);
131 thread.setDaemon(true);
136 public synchronized void stop() {
137 logger.info("requesting stop of operation history thread");
141 if (thread == null) {
142 // no thread to close the factory - do it here
146 // the thread will close the factory when it sees the end marker
147 operations.add(END_MARKER);
152 public synchronized void store(String requestId, VirtualControlLoopEvent event, String targetEntity,
153 ControlLoopOperation operation) {
156 logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId,
161 operations.add(new Record(requestId, event, targetEntity, operation));
163 if (operations.size() > maxQueueLength) {
164 Record discarded = operations.remove();
165 logger.warn("too many items to store in the operation history table, discarding {}", discarded);
170 * Takes records from {@link #operations} and stores them in the queue. Continues to
171 * run until {@link #stop()} is invoked, or the thread is interrupted.
173 * @param emfactory entity manager factory
175 private void run(EntityManagerFactory emfactory) {
177 // store records until stopped, continuing if an exception occurs
180 Record triple = operations.take();
181 storeBatch(emfactory.createEntityManager(), triple);
183 } catch (RuntimeException e) {
184 logger.error("failed to save data to operation history table", e);
186 } catch (InterruptedException e) {
187 logger.error("interrupted, discarding remaining operation history data", e);
188 Thread.currentThread().interrupt();
193 storeRemainingRecords(emfactory);
196 synchronized (this) {
205 * Store any remaining records, but stop at the first exception.
207 * @param emfactory entity manager factory
209 private void storeRemainingRecords(EntityManagerFactory emfactory) {
211 while (!operations.isEmpty()) {
212 storeBatch(emfactory.createEntityManager(), operations.poll());
215 } catch (RuntimeException e) {
216 logger.error("failed to save remaining data to operation history table", e);
221 * Stores a batch of records.
223 * @param entityManager entity manager
224 * @param firstRecord first record to be stored
226 private void storeBatch(EntityManager entityManager, Record firstRecord) {
227 logger.info("store operation history record batch");
229 try (EntityMgrCloser emc = new EntityMgrCloser(entityManager);
230 EntityTransCloser trans = new EntityTransCloser(entityManager.getTransaction())) {
233 Record record = firstRecord;
235 while (record != null && record != END_MARKER) {
236 storeRecord(entityManager, record);
238 if (++nrecords >= batchSize) {
242 record = operations.poll();
246 recordsCommitted += nrecords;
253 * @param entityManager entity manager
254 * @param record record to be stored
256 private void storeRecord(EntityManager entityMgr, Record record) {
258 final VirtualControlLoopEvent event = record.getEvent();
259 final ControlLoopOperation operation = record.getOperation();
261 logger.info("store operation history record for {}", event.getRequestId());
263 List<OperationsHistory> results =
264 entityMgr.createQuery("select e from OperationsHistory e"
265 + " where e.closedLoopName= ?1"
266 + " and e.requestId= ?2"
267 + " and e.subrequestId= ?3"
269 + " and e.operation= ?5"
270 + " and e.target= ?6",
271 OperationsHistory.class)
272 .setParameter(1, event.getClosedLoopControlName())
273 .setParameter(2, record.getRequestId())
274 .setParameter(3, operation.getSubRequestId())
275 .setParameter(4, operation.getActor())
276 .setParameter(5, operation.getOperation())
277 .setParameter(6, record.getTargetEntity())
280 if (results.size() > 1) {
281 logger.warn("unexpected operation history record count {} for {}", results.size(), event.getRequestId());
284 OperationsHistory entry = (results.isEmpty() ? new OperationsHistory() : results.get(0));
286 entry.setClosedLoopName(event.getClosedLoopControlName());
287 entry.setRequestId(record.getRequestId());
288 entry.setActor(operation.getActor());
289 entry.setOperation(operation.getOperation());
290 entry.setTarget(record.getTargetEntity());
291 entry.setSubrequestId(operation.getSubRequestId());
292 entry.setMessage(operation.getMessage());
293 entry.setOutcome(operation.getOutcome());
294 if (operation.getStart() != null) {
295 entry.setStarttime(new Date(operation.getStart().toEpochMilli()));
297 entry.setStarttime(null);
299 if (operation.getEnd() != null) {
300 entry.setEndtime(new Date(operation.getEnd().toEpochMilli()));
302 entry.setEndtime(null);
305 if (results.isEmpty()) {
306 logger.info("insert operation history record for {}", event.getRequestId());
308 entityMgr.persist(entry);
310 logger.info("update operation history record for {}", event.getRequestId());
312 entityMgr.merge(entry);
317 * Converts the parameters to Properties.
319 * @param params parameters to be converted
320 * @return a new property set
322 private Properties toProperties(OperationHistoryDataManagerParams params) {
323 Properties props = new Properties();
324 props.put(PersistenceUnitProperties.JDBC_URL, params.getUrl());
325 props.put(PersistenceUnitProperties.JDBC_USER, params.getUserName());
326 props.put(PersistenceUnitProperties.JDBC_PASSWORD, params.getPassword());
327 props.put(PersistenceUnitProperties.CLASSLOADER, getClass().getClassLoader());
336 private static class Record {
337 private String requestId;
338 private VirtualControlLoopEvent event;
339 private String targetEntity;
340 private ControlLoopOperation operation;
343 // the following may be overridden by junit tests
345 protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
346 return Persistence.createEntityManagerFactory(opsHistPu, props);
349 protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
350 return new Thread(() -> command.accept(emfactory));