2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.ophistory;
23 import java.util.Date;
24 import java.util.List;
25 import java.util.Properties;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.function.Consumer;
29 import javax.persistence.EntityManager;
30 import javax.persistence.EntityManagerFactory;
31 import javax.persistence.Persistence;
32 import lombok.AllArgsConstructor;
34 import lombok.NoArgsConstructor;
35 import lombok.ToString;
36 import org.eclipse.persistence.config.PersistenceUnitProperties;
37 import org.onap.policy.common.parameters.ValidationResult;
38 import org.onap.policy.common.utils.jpa.EntityMgrCloser;
39 import org.onap.policy.common.utils.jpa.EntityTransCloser;
40 import org.onap.policy.controlloop.ControlLoopOperation;
41 import org.onap.policy.controlloop.VirtualControlLoopEvent;
42 import org.onap.policy.guard.OperationsHistory;
43 import org.onap.policy.guard.Util;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
48 * Data manager that stores records in the DB, asynchronously, using a background thread.
50 public class OperationHistoryDataManagerImpl implements OperationHistoryDataManager {
51 private static final Logger logger = LoggerFactory.getLogger(OperationHistoryDataManagerImpl.class);
54 * Added to the end of {@link #operations} when {@link #stop()} is called. This is
55 * used to get the background thread out of a blocking wait for the next record.
57 private static final Record END_MARKER = new Record();
59 // copied from the parameters
60 private final int maxQueueLength;
61 private final int batchSize;
63 private final EntityManagerFactory emFactory;
66 * Thread that takes records from {@link #operations} and stores them in the DB.
68 private Thread thread;
71 * Set to {@code true} to stop the background thread.
73 private boolean stopped = false;
76 * Queue of operations waiting to be stored in the DB. When {@link #stop()} is called,
77 * an {@link #END_MARKER} is added to the end of the queue.
79 private final BlockingQueue<Record> operations = new LinkedBlockingQueue<>();
82 * Number of records that have been processed and committed into the DB by this data
86 private long recordsCommitted = 0;
89 * Number of records that have been inserted into the DB by this data manager
90 * instance, whether or not they were committed.
93 private long recordsInserted = 0;
96 * Number of records that have been updated within the DB by this data manager
97 * instance, whether or not they were committed.
100 private long recordsUpdated = 0;
104 * Constructs the object.
106 * @param params data manager parameters
108 public OperationHistoryDataManagerImpl(OperationHistoryDataManagerParams params) {
109 ValidationResult result = params.validate("data-manager-properties");
110 if (!result.isValid()) {
111 throw new IllegalArgumentException(result.getResult());
114 this.maxQueueLength = params.getMaxQueueLength();
115 this.batchSize = params.getBatchSize();
117 // create the factory using the properties
118 Properties props = toProperties(params);
119 this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props);
123 public synchronized void start() {
124 if (stopped || thread != null) {
129 logger.info("start operation history thread");
131 thread = makeThread(emFactory, this::run);
132 thread.setDaemon(true);
137 public synchronized void stop() {
138 logger.info("requesting stop of operation history thread");
142 if (thread == null) {
143 // no thread to close the factory - do it here
147 // the thread will close the factory when it sees the end marker
148 operations.add(END_MARKER);
153 public synchronized void store(String requestId, VirtualControlLoopEvent event, String targetEntity,
154 ControlLoopOperation operation) {
157 logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId,
162 operations.add(new Record(requestId, event, targetEntity, operation));
164 if (operations.size() > maxQueueLength) {
165 Record discarded = operations.remove();
166 logger.warn("too many items to store in the operation history table, discarding {}", discarded);
171 * Takes records from {@link #operations} and stores them in the queue. Continues to
172 * run until {@link #stop()} is invoked, or the thread is interrupted.
174 * @param emfactory entity manager factory
176 private void run(EntityManagerFactory emfactory) {
178 // store records until stopped, continuing if an exception occurs
181 Record triple = operations.take();
182 storeBatch(emfactory.createEntityManager(), triple);
184 } catch (RuntimeException e) {
185 logger.error("failed to save data to operation history table", e);
187 } catch (InterruptedException e) {
188 logger.error("interrupted, discarding remaining operation history data", e);
189 Thread.currentThread().interrupt();
194 storeRemainingRecords(emfactory);
197 synchronized (this) {
206 * Store any remaining records, but stop at the first exception.
208 * @param emfactory entity manager factory
210 private void storeRemainingRecords(EntityManagerFactory emfactory) {
212 while (!operations.isEmpty()) {
213 storeBatch(emfactory.createEntityManager(), operations.poll());
216 } catch (RuntimeException e) {
217 logger.error("failed to save remaining data to operation history table", e);
222 * Stores a batch of records.
224 * @param entityManager entity manager
225 * @param firstRecord first record to be stored
227 private void storeBatch(EntityManager entityManager, Record firstRecord) {
228 logger.info("store operation history record batch");
230 try (EntityMgrCloser emc = new EntityMgrCloser(entityManager);
231 EntityTransCloser trans = new EntityTransCloser(entityManager.getTransaction())) {
234 Record record = firstRecord;
236 while (record != null && record != END_MARKER) {
237 storeRecord(entityManager, record);
239 if (++nrecords >= batchSize) {
243 record = operations.poll();
247 recordsCommitted += nrecords;
254 * @param entityManager entity manager
255 * @param record record to be stored
257 private void storeRecord(EntityManager entityMgr, Record record) {
259 final VirtualControlLoopEvent event = record.getEvent();
260 final ControlLoopOperation operation = record.getOperation();
262 logger.info("store operation history record for {}", event.getRequestId());
264 List<OperationsHistory> results =
265 entityMgr.createQuery("select e from OperationsHistory e"
266 + " where e.closedLoopName= ?1"
267 + " and e.requestId= ?2"
268 + " and e.subrequestId= ?3"
270 + " and e.operation= ?5"
271 + " and e.target= ?6",
272 OperationsHistory.class)
273 .setParameter(1, event.getClosedLoopControlName())
274 .setParameter(2, record.getRequestId())
275 .setParameter(3, operation.getSubRequestId())
276 .setParameter(4, operation.getActor())
277 .setParameter(5, operation.getOperation())
278 .setParameter(6, record.getTargetEntity())
281 if (results.size() > 1) {
282 logger.warn("unexpected operation history record count {} for {}", results.size(), event.getRequestId());
285 OperationsHistory entry = (results.isEmpty() ? new OperationsHistory() : results.get(0));
287 entry.setClosedLoopName(event.getClosedLoopControlName());
288 entry.setRequestId(record.getRequestId());
289 entry.setActor(operation.getActor());
290 entry.setOperation(operation.getOperation());
291 entry.setTarget(record.getTargetEntity());
292 entry.setSubrequestId(operation.getSubRequestId());
293 entry.setMessage(operation.getMessage());
294 entry.setOutcome(operation.getOutcome());
295 if (operation.getStart() != null) {
296 entry.setStarttime(new Date(operation.getStart().toEpochMilli()));
298 entry.setStarttime(null);
300 if (operation.getEnd() != null) {
301 entry.setEndtime(new Date(operation.getEnd().toEpochMilli()));
303 entry.setEndtime(null);
306 if (results.isEmpty()) {
307 logger.info("insert operation history record for {}", event.getRequestId());
309 entityMgr.persist(entry);
311 logger.info("update operation history record for {}", event.getRequestId());
313 entityMgr.merge(entry);
318 * Converts the parameters to Properties.
320 * @param params parameters to be converted
321 * @return a new property set
323 private Properties toProperties(OperationHistoryDataManagerParams params) {
324 Properties props = new Properties();
325 props.put(Util.ECLIPSE_LINK_KEY_URL, params.getUrl());
326 props.put(Util.ECLIPSE_LINK_KEY_USER, params.getUserName());
327 props.put(Util.ECLIPSE_LINK_KEY_PASS, params.getPassword());
328 props.put(PersistenceUnitProperties.CLASSLOADER, getClass().getClassLoader());
337 private static class Record {
338 private String requestId;
339 private VirtualControlLoopEvent event;
340 private String targetEntity;
341 private ControlLoopOperation operation;
344 // the following may be overridden by junit tests
346 protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
347 return Persistence.createEntityManagerFactory(opsHistPu, props);
350 protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
351 return new Thread(() -> command.accept(emfactory));