2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.ophistory;
23 import java.util.Date;
24 import java.util.Properties;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.function.Consumer;
28 import javax.persistence.EntityManager;
29 import javax.persistence.EntityManagerFactory;
30 import javax.persistence.Persistence;
31 import lombok.AllArgsConstructor;
33 import lombok.NoArgsConstructor;
34 import lombok.ToString;
35 import org.eclipse.persistence.config.PersistenceUnitProperties;
36 import org.onap.policy.common.parameters.ValidationResult;
37 import org.onap.policy.common.utils.jpa.EntityMgrCloser;
38 import org.onap.policy.common.utils.jpa.EntityTransCloser;
39 import org.onap.policy.controlloop.ControlLoopOperation;
40 import org.onap.policy.controlloop.VirtualControlLoopEvent;
41 import org.onap.policy.database.operationshistory.Dbao;
42 import org.onap.policy.guard.Util;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * Data manager that stores records in the DB, asynchronously, using a background thread.
49 public class OperationHistoryDataManagerImpl implements OperationHistoryDataManager {
50 private static final Logger logger = LoggerFactory.getLogger(OperationHistoryDataManagerImpl.class);
53 * Added to the end of {@link #operations} when {@link #stop()} is called. This is
54 * used to get the background thread out of a blocking wait for the next record.
56 private static final Record END_MARKER = new Record();
58 // copied from the parameters
59 private final int maxQueueLength;
60 private final int batchSize;
62 private final EntityManagerFactory emFactory;
65 * Thread that takes records from {@link #operations} and stores them in the DB.
67 private Thread thread;
70 * Set to {@code true} to stop the background thread.
72 private boolean stopped = false;
75 * Queue of operations waiting to be stored in the DB. When {@link #stop()} is called,
76 * an {@link #END_MARKER} is added to the end of the queue.
78 private final BlockingQueue<Record> operations = new LinkedBlockingQueue<>();
81 * Number of records that have been added to the DB by this data manager instance.
84 private long recordsAdded = 0;
88 * Constructs the object.
90 * @param params data manager parameters
92 public OperationHistoryDataManagerImpl(OperationHistoryDataManagerParams params) {
93 ValidationResult result = params.validate("data-manager-properties");
94 if (!result.isValid()) {
95 throw new IllegalArgumentException(result.getResult());
98 this.maxQueueLength = params.getMaxQueueLength();
99 this.batchSize = params.getBatchSize();
101 // create the factory using the properties
102 Properties props = toProperties(params);
103 this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props);
107 public synchronized void start() {
108 if (stopped || thread != null) {
113 logger.info("start operation history thread");
115 thread = makeThread(emFactory, this::run);
116 thread.setDaemon(true);
121 public synchronized void stop() {
122 logger.info("requesting stop of operation history thread");
126 if (thread == null) {
127 // no thread to close the factory - do it here
131 // the thread will close the factory when it sees the end marker
132 operations.add(END_MARKER);
137 public synchronized void store(String requestId, VirtualControlLoopEvent event, String targetEntity,
138 ControlLoopOperation operation) {
141 logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId,
146 operations.add(new Record(requestId, event, targetEntity, operation));
148 if (operations.size() > maxQueueLength) {
149 Record discarded = operations.remove();
150 logger.warn("too many items to store in the operation history table, discarding {}", discarded);
155 * Takes records from {@link #operations} and stores them in the queue. Continues to
156 * run until {@link #stop()} is invoked, or the thread is interrupted.
158 * @param emfactory entity manager factory
160 private void run(EntityManagerFactory emfactory) {
162 // store records until stopped, continuing if an exception occurs
165 Record triple = operations.take();
166 storeBatch(emfactory.createEntityManager(), triple);
168 } catch (RuntimeException e) {
169 logger.error("failed to save data to operation history table", e);
171 } catch (InterruptedException e) {
172 logger.error("interrupted, discarding remaining operation history data", e);
173 Thread.currentThread().interrupt();
178 storeRemainingRecords(emfactory);
181 synchronized (this) {
190 * Store any remaining records, but stop at the first exception.
192 * @param emfactory entity manager factory
194 private void storeRemainingRecords(EntityManagerFactory emfactory) {
196 while (!operations.isEmpty()) {
197 storeBatch(emfactory.createEntityManager(), operations.poll());
200 } catch (RuntimeException e) {
201 logger.error("failed to save remaining data to operation history table", e);
206 * Stores a batch of records.
208 * @param entityManager entity manager
209 * @param firstRecord first record to be stored
211 private void storeBatch(EntityManager entityManager, Record firstRecord) {
212 logger.info("store operation history record batch");
214 try (EntityMgrCloser emc = new EntityMgrCloser(entityManager);
215 EntityTransCloser trans = new EntityTransCloser(entityManager.getTransaction())) {
218 Record record = firstRecord;
220 while (record != null && record != END_MARKER) {
221 storeRecord(entityManager, record);
223 if (++nrecords >= batchSize) {
227 record = operations.poll();
231 recordsAdded += nrecords;
238 * @param entityManager entity manager
239 * @param record record to be stored
241 private void storeRecord(EntityManager entityMgr, Record record) {
242 Dbao newEntry = new Dbao();
244 final VirtualControlLoopEvent event = record.getEvent();
245 final ControlLoopOperation operation = record.getOperation();
247 logger.info("store operation history record for {}", event.getRequestId());
249 newEntry.setClosedLoopName(event.getClosedLoopControlName());
250 newEntry.setRequestId(record.getRequestId());
251 newEntry.setActor(operation.getActor());
252 newEntry.setOperation(operation.getOperation());
253 newEntry.setTarget(record.getTargetEntity());
254 newEntry.setSubrequestId(operation.getSubRequestId());
255 newEntry.setMessage(operation.getMessage());
256 newEntry.setOutcome(operation.getOutcome());
257 if (operation.getStart() != null) {
258 newEntry.setStarttime(new Date(operation.getStart().toEpochMilli()));
260 if (operation.getEnd() != null) {
261 newEntry.setEndtime(new Date(operation.getEnd().toEpochMilli()));
264 entityMgr.persist(newEntry);
268 * Converts the parameters to Properties.
270 * @param params parameters to be converted
271 * @return a new property set
273 private Properties toProperties(OperationHistoryDataManagerParams params) {
274 Properties props = new Properties();
275 props.put(Util.ECLIPSE_LINK_KEY_URL, params.getUrl());
276 props.put(Util.ECLIPSE_LINK_KEY_USER, params.getUserName());
277 props.put(Util.ECLIPSE_LINK_KEY_PASS, params.getPassword());
278 props.put(PersistenceUnitProperties.CLASSLOADER, getClass().getClassLoader());
287 private static class Record {
288 private String requestId;
289 private VirtualControlLoopEvent event;
290 private String targetEntity;
291 private ControlLoopOperation operation;
294 // the following may be overridden by junit tests
296 protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
297 return Persistence.createEntityManagerFactory(opsHistPu, props);
300 protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
301 return new Thread(() -> command.accept(emfactory));