2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
 
   6  * Modifications Copyright (C) 2023 Nordix Foundation.
 
   7  * ================================================================================
 
   8  * Licensed under the Apache License, Version 2.0 (the "License");
 
   9  * you may not use this file except in compliance with the License.
 
  10  * You may obtain a copy of the License at
 
  12  *      http://www.apache.org/licenses/LICENSE-2.0
 
  14  * Unless required by applicable law or agreed to in writing, software
 
  15  * distributed under the License is distributed on an "AS IS" BASIS,
 
  16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  17  * See the License for the specific language governing permissions and
 
  18  * limitations under the License.
 
  19  * ============LICENSE_END=========================================================
 
  22 package org.onap.policy.controlloop.ophistory;
 
  24 import jakarta.persistence.EntityManager;
 
  25 import jakarta.persistence.EntityManagerFactory;
 
  26 import jakarta.persistence.Persistence;
 
  27 import java.util.Date;
 
  28 import java.util.List;
 
  29 import java.util.Properties;
 
  30 import java.util.concurrent.BlockingQueue;
 
  31 import java.util.concurrent.LinkedBlockingQueue;
 
  32 import java.util.function.Consumer;
 
  33 import lombok.AllArgsConstructor;
 
  35 import lombok.NoArgsConstructor;
 
  36 import lombok.ToString;
 
  37 import org.onap.policy.common.parameters.ValidationResult;
 
  38 import org.onap.policy.common.utils.jpa.EntityMgrCloser;
 
  39 import org.onap.policy.common.utils.jpa.EntityTransCloser;
 
  40 import org.onap.policy.controlloop.ControlLoopOperation;
 
  41 import org.onap.policy.guard.OperationsHistory;
 
  42 import org.slf4j.Logger;
 
  43 import org.slf4j.LoggerFactory;
 
  46  * Data manager that stores records in the DB, asynchronously, using a background thread.
 
  48 public class OperationHistoryDataManagerImpl implements OperationHistoryDataManager {
 
  49     private static final Logger logger = LoggerFactory.getLogger(OperationHistoryDataManagerImpl.class);
 
  52      * Added to the end of {@link #operations} when {@link #stop()} is called. This is
 
  53      * used to get the background thread out of a blocking wait for the next record.
 
  55     private static final Record END_MARKER = new Record();
 
  57     // copied from the parameters
 
  58     private final int maxQueueLength;
 
  59     private final int batchSize;
 
  61     private final EntityManagerFactory emFactory;
 
  64      * Thread that takes records from {@link #operations} and stores them in the DB.
 
  66     private Thread thread;
 
  69      * Set to {@code true} to stop the background thread.
 
  71     private boolean stopped = false;
 
  74      * Queue of operations waiting to be stored in the DB. When {@link #stop()} is called,
 
  75      * an {@link #END_MARKER} is added to the end of the queue.
 
  77     private final BlockingQueue<Record> operations = new LinkedBlockingQueue<>();
 
  80      * Number of records that have been processed and committed into the DB by this data
 
  84     private long recordsCommitted = 0;
 
  87      * Number of records that have been inserted into the DB by this data manager
 
  88      * instance, whether or not they were committed.
 
  91     private long recordsInserted = 0;
 
  94      * Number of records that have been updated within the DB by this data manager
 
  95      * instance, whether or not they were committed.
 
  98     private long recordsUpdated = 0;
 
 101      * Constructs the object.
 
 103      * @param params data manager parameters
 
 105     public OperationHistoryDataManagerImpl(OperationHistoryDataManagerParams params) {
 
 106         ValidationResult result = params.validate("data-manager-properties");
 
 107         if (!result.isValid()) {
 
 108             throw new IllegalArgumentException(result.getResult());
 
 111         this.maxQueueLength = params.getMaxQueueLength();
 
 112         this.batchSize = params.getBatchSize();
 
 114         // create the factory using the properties
 
 115         var props = toProperties(params);
 
 116         this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props);
 
 120     public synchronized void start() {
 
 121         if (stopped || thread != null) {
 
 126         logger.info("start operation history thread");
 
 128         thread = makeThread(emFactory, this::run);
 
 129         thread.setDaemon(true);
 
 134     public synchronized void stop() {
 
 135         logger.info("requesting stop of operation history thread");
 
 139         if (thread == null) {
 
 140             // no thread to close the factory - do it here
 
 144             // the thread will close the factory when it sees the end marker
 
 145             operations.add(END_MARKER);
 
 150     public synchronized void store(String requestId, String clName, Object event, String targetEntity,
 
 151         ControlLoopOperation operation) {
 
 154             logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId,
 
 159         operations.add(new Record(requestId, clName, event, targetEntity, operation));
 
 161         if (operations.size() > maxQueueLength) {
 
 162             Record discarded = operations.remove();
 
 163             logger.warn("too many items to store in the operation history table, discarding {}", discarded);
 
 168      * Takes records from {@link #operations} and stores them in the queue. Continues to
 
 169      * run until {@link #stop()} is invoked, or the thread is interrupted.
 
 171      * @param emfactory entity manager factory
 
 173     private void run(EntityManagerFactory emfactory) {
 
 175             // store records until stopped, continuing if an exception occurs
 
 178                     Record triple = operations.take();
 
 179                     storeBatch(emfactory.createEntityManager(), triple);
 
 181                 } catch (RuntimeException e) {
 
 182                     logger.error("failed to save data to operation history table", e);
 
 184                 } catch (InterruptedException e) {
 
 185                     logger.error("interrupted, discarding remaining operation history data", e);
 
 186                     Thread.currentThread().interrupt();
 
 191             storeRemainingRecords(emfactory);
 
 194             synchronized (this) {
 
 203      * Store any remaining records, but stop at the first exception.
 
 205      * @param emfactory entity manager factory
 
 207     private void storeRemainingRecords(EntityManagerFactory emfactory) {
 
 209             while (!operations.isEmpty()) {
 
 210                 storeBatch(emfactory.createEntityManager(), operations.poll());
 
 213         } catch (RuntimeException e) {
 
 214             logger.error("failed to save remaining data to operation history table", e);
 
 219      * Stores a batch of records.
 
 221      * @param entityManager entity manager
 
 222      * @param firstRecord first record to be stored
 
 224     private void storeBatch(EntityManager entityManager, Record firstRecord) {
 
 225         logger.info("store operation history record batch");
 
 227         try (var emc = new EntityMgrCloser(entityManager);
 
 228             var trans = new EntityTransCloser(entityManager.getTransaction())) {
 
 231             var rec = firstRecord;
 
 233             while (rec != null && rec != END_MARKER) {
 
 234                 storeRecord(entityManager, rec);
 
 236                 if (++nrecords >= batchSize) {
 
 240                 rec = operations.poll();
 
 244             recordsCommitted += nrecords;
 
 251      * @param entityMgr entity manager
 
 252      * @param rec record to be stored
 
 254     private void storeRecord(EntityManager entityMgr, Record rec) {
 
 256         final String reqId = rec.getRequestId();
 
 257         final String clName = rec.getClName();
 
 258         final ControlLoopOperation operation = rec.getOperation();
 
 260         logger.info("store operation history record for {}", reqId);
 
 262         List<OperationsHistory> results = entityMgr
 
 263             .createQuery("select e from OperationsHistory e" + " where e.closedLoopName= ?1"
 
 264                 + " and e.requestId= ?2" + " and e.subrequestId= ?3" + " and e.actor= ?4"
 
 265                 + " and e.operation= ?5" + " and e.target= ?6", OperationsHistory.class)
 
 266             .setParameter(1, clName).setParameter(2, rec.getRequestId())
 
 267             .setParameter(3, operation.getSubRequestId()).setParameter(4, operation.getActor())
 
 268             .setParameter(5, operation.getOperation()).setParameter(6, rec.getTargetEntity())
 
 271         if (results.size() > 1) {
 
 272             logger.warn("unexpected operation history record count {} for {}", results.size(), reqId);
 
 275         OperationsHistory entry = (results.isEmpty() ? new OperationsHistory() : results.get(0));
 
 277         entry.setClosedLoopName(clName);
 
 278         entry.setRequestId(rec.getRequestId());
 
 279         entry.setActor(operation.getActor());
 
 280         entry.setOperation(operation.getOperation());
 
 281         entry.setTarget(rec.getTargetEntity());
 
 282         entry.setSubrequestId(operation.getSubRequestId());
 
 283         entry.setMessage(operation.getMessage());
 
 284         entry.setOutcome(operation.getOutcome());
 
 285         if (operation.getStart() != null) {
 
 286             entry.setStarttime(new Date(operation.getStart().toEpochMilli()));
 
 288             entry.setStarttime(null);
 
 290         if (operation.getEnd() != null) {
 
 291             entry.setEndtime(new Date(operation.getEnd().toEpochMilli()));
 
 293             entry.setEndtime(null);
 
 296         if (results.isEmpty()) {
 
 297             logger.info("insert operation history record for {}", reqId);
 
 299             entityMgr.persist(entry);
 
 301             logger.info("update operation history record for {}", reqId);
 
 303             entityMgr.merge(entry);
 
 308      * Converts the parameters to Properties.
 
 310      * @param params parameters to be converted
 
 311      * @return a new property set
 
 313     private Properties toProperties(OperationHistoryDataManagerParams params) {
 
 314         var props = new Properties();
 
 315         props.put("jakarta.persistence.jdbc.driver",   params.getDriver());
 
 316         props.put("jakarta.persistence.jdbc.url",      params.getUrl());
 
 317         props.put("jakarta.persistence.jdbc.user",     params.getUserName());
 
 318         props.put("jakarta.persistence.jdbc.password", params.getPassword());
 
 319         props.put("hibernate.dialect",               params.getDbHibernateDialect());
 
 328     private static class Record {
 
 329         private String requestId;
 
 330         private String clName;
 
 331         private Object event;
 
 332         private String targetEntity;
 
 333         private ControlLoopOperation operation;
 
 336     // the following may be overridden by junit tests
 
 338     protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
 
 339         return Persistence.createEntityManagerFactory(opsHistPu, props);
 
 342     protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
 
 343         return new Thread(() -> command.accept(emfactory));