* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.policy.controlloop.ophistory;
+import jakarta.persistence.EntityManager;
+import jakarta.persistence.EntityManagerFactory;
+import jakarta.persistence.Persistence;
import java.util.Date;
+import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
-import javax.persistence.EntityManager;
-import javax.persistence.EntityManagerFactory;
-import javax.persistence.Persistence;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
-import org.eclipse.persistence.config.PersistenceUnitProperties;
import org.onap.policy.common.parameters.ValidationResult;
import org.onap.policy.common.utils.jpa.EntityMgrCloser;
import org.onap.policy.common.utils.jpa.EntityTransCloser;
import org.onap.policy.controlloop.ControlLoopOperation;
-import org.onap.policy.controlloop.VirtualControlLoopEvent;
-import org.onap.policy.database.operationshistory.Dbao;
-import org.onap.policy.guard.Util;
+import org.onap.policy.guard.OperationsHistory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final BlockingQueue<Record> operations = new LinkedBlockingQueue<>();
/**
- * Number of records that have been added to the DB by this data manager instance.
+ * Number of records that have been processed and committed into the DB by this data
+ * manager instance.
*/
@Getter
- private long recordsAdded = 0;
+ private long recordsCommitted = 0;
+ /**
+ * Number of records that have been inserted into the DB by this data manager
+ * instance, whether or not they were committed.
+ */
+ @Getter
+ private long recordsInserted = 0;
+
+ /**
+ * Number of records that have been updated within the DB by this data manager
+ * instance, whether or not they were committed.
+ */
+ @Getter
+ private long recordsUpdated = 0;
/**
* Constructs the object.
this.batchSize = params.getBatchSize();
// create the factory using the properties
- Properties props = toProperties(params);
+ var props = toProperties(params);
this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props);
}
return;
}
+ logger.info("start operation history thread");
+
thread = makeThread(emFactory, this::run);
thread.setDaemon(true);
thread.start();
@Override
public synchronized void stop() {
+ logger.info("requesting stop of operation history thread");
+
stopped = true;
if (thread == null) {
}
@Override
- public synchronized void store(String requestId, VirtualControlLoopEvent event, ControlLoopOperation operation) {
+ public synchronized void store(String requestId, String clName, Object event, String targetEntity,
+ ControlLoopOperation operation) {
if (stopped) {
logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId,
- event, operation);
+ event, operation);
return;
}
- operations.add(new Record(requestId, event, operation));
+ operations.add(new Record(requestId, clName, event, targetEntity, operation));
if (operations.size() > maxQueueLength) {
Record discarded = operations.remove();
* @param firstRecord first record to be stored
*/
private void storeBatch(EntityManager entityManager, Record firstRecord) {
+ logger.info("store operation history record batch");
- try (EntityMgrCloser emc = new EntityMgrCloser(entityManager);
- EntityTransCloser trans = new EntityTransCloser(entityManager.getTransaction())) {
+ try (var emc = new EntityMgrCloser(entityManager);
+ var trans = new EntityTransCloser(entityManager.getTransaction())) {
- int nrecords = 0;
- Record record = firstRecord;
+ var nrecords = 0;
+ var rec = firstRecord;
- while (record != null && record != END_MARKER) {
- storeRecord(entityManager, record);
+ while (rec != null && rec != END_MARKER) {
+ storeRecord(entityManager, rec);
if (++nrecords >= batchSize) {
break;
}
- record = operations.poll();
+ rec = operations.poll();
}
trans.commit();
- recordsAdded += nrecords;
+ recordsCommitted += nrecords;
}
}
/**
* Stores a record.
*
- * @param entityManager entity manager
- * @param record record to be stored
+ * @param entityMgr entity manager
+ * @param rec record to be stored
*/
- private void storeRecord(EntityManager entityMgr, Record record) {
+ private void storeRecord(EntityManager entityMgr, Record rec) {
+
+ final String reqId = rec.getRequestId();
+ final String clName = rec.getClName();
+ final ControlLoopOperation operation = rec.getOperation();
+
+ logger.info("store operation history record for {}", reqId);
- Dbao newEntry = new Dbao();
+ List<OperationsHistory> results = entityMgr
+ .createQuery("select e from OperationsHistory e" + " where e.closedLoopName= ?1"
+ + " and e.requestId= ?2" + " and e.subrequestId= ?3" + " and e.actor= ?4"
+ + " and e.operation= ?5" + " and e.target= ?6", OperationsHistory.class)
+ .setParameter(1, clName).setParameter(2, rec.getRequestId())
+ .setParameter(3, operation.getSubRequestId()).setParameter(4, operation.getActor())
+ .setParameter(5, operation.getOperation()).setParameter(6, rec.getTargetEntity())
+ .getResultList();
+
+ if (results.size() > 1) {
+ logger.warn("unexpected operation history record count {} for {}", results.size(), reqId);
+ }
- final VirtualControlLoopEvent event = record.getEvent();
- final ControlLoopOperation operation = record.getOperation();
+ OperationsHistory entry = (results.isEmpty() ? new OperationsHistory() : results.get(0));
- newEntry.setClosedLoopName(event.getClosedLoopControlName());
- newEntry.setRequestId(record.getRequestId());
- newEntry.setActor(operation.getActor());
- newEntry.setOperation(operation.getOperation());
- newEntry.setTarget(operation.getTarget());
- newEntry.setSubrequestId(operation.getSubRequestId());
- newEntry.setMessage(operation.getMessage());
- newEntry.setOutcome(operation.getOutcome());
+ entry.setClosedLoopName(clName);
+ entry.setRequestId(rec.getRequestId());
+ entry.setActor(operation.getActor());
+ entry.setOperation(operation.getOperation());
+ entry.setTarget(rec.getTargetEntity());
+ entry.setSubrequestId(operation.getSubRequestId());
+ entry.setMessage(operation.getMessage());
+ entry.setOutcome(operation.getOutcome());
if (operation.getStart() != null) {
- newEntry.setStarttime(new Date(operation.getStart().toEpochMilli()));
+ entry.setStarttime(new Date(operation.getStart().toEpochMilli()));
+ } else {
+ entry.setStarttime(null);
}
if (operation.getEnd() != null) {
- newEntry.setEndtime(new Date(operation.getEnd().toEpochMilli()));
+ entry.setEndtime(new Date(operation.getEnd().toEpochMilli()));
+ } else {
+ entry.setEndtime(null);
}
- entityMgr.persist(newEntry);
+ if (results.isEmpty()) {
+ logger.info("insert operation history record for {}", reqId);
+ ++recordsInserted;
+ entityMgr.persist(entry);
+ } else {
+ logger.info("update operation history record for {}", reqId);
+ ++recordsUpdated;
+ entityMgr.merge(entry);
+ }
}
/**
* @return a new property set
*/
private Properties toProperties(OperationHistoryDataManagerParams params) {
- Properties props = new Properties();
- props.put(Util.ECLIPSE_LINK_KEY_URL, params.getUrl());
- props.put(Util.ECLIPSE_LINK_KEY_USER, params.getUserName());
- props.put(Util.ECLIPSE_LINK_KEY_PASS, params.getPassword());
- props.put(PersistenceUnitProperties.CLASSLOADER, getClass().getClassLoader());
+ var props = new Properties();
+ props.put("jakarta.persistence.jdbc.driver", params.getDriver());
+ props.put("jakarta.persistence.jdbc.url", params.getUrl());
+ props.put("jakarta.persistence.jdbc.user", params.getUserName());
+ props.put("jakarta.persistence.jdbc.password", params.getPassword());
+ props.put("hibernate.dialect", params.getDbHibernateDialect());
return props;
}
@ToString
private static class Record {
private String requestId;
- private VirtualControlLoopEvent event;
+ private String clName;
+ private Object event;
+ private String targetEntity;
private ControlLoopOperation operation;
}