Replace Eclipselink with Hibernate
[policy/drools-applications.git] / controlloop / common / eventmanager / src / main / java / org / onap / policy / controlloop / ophistory / OperationHistoryDataManagerImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.controlloop.ophistory;
23
24 import java.util.Date;
25 import java.util.List;
26 import java.util.Properties;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.function.Consumer;
30 import javax.persistence.EntityManager;
31 import javax.persistence.EntityManagerFactory;
32 import javax.persistence.Persistence;
33 import lombok.AllArgsConstructor;
34 import lombok.Getter;
35 import lombok.NoArgsConstructor;
36 import lombok.ToString;
37 import org.onap.policy.common.parameters.ValidationResult;
38 import org.onap.policy.common.utils.jpa.EntityMgrCloser;
39 import org.onap.policy.common.utils.jpa.EntityTransCloser;
40 import org.onap.policy.controlloop.ControlLoopOperation;
41 import org.onap.policy.guard.OperationsHistory;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Data manager that stores records in the DB, asynchronously, using a background thread.
47  */
48 public class OperationHistoryDataManagerImpl implements OperationHistoryDataManager {
49     private static final Logger logger = LoggerFactory.getLogger(OperationHistoryDataManagerImpl.class);
50
51     /**
52      * Added to the end of {@link #operations} when {@link #stop()} is called. This is
53      * used to get the background thread out of a blocking wait for the next record.
54      */
55     private static final Record END_MARKER = new Record();
56
57     // copied from the parameters
58     private final int maxQueueLength;
59     private final int batchSize;
60
61     private final EntityManagerFactory emFactory;
62
63     /**
64      * Thread that takes records from {@link #operations} and stores them in the DB.
65      */
66     private Thread thread;
67
68     /**
69      * Set to {@code true} to stop the background thread.
70      */
71     private boolean stopped = false;
72
73     /**
74      * Queue of operations waiting to be stored in the DB. When {@link #stop()} is called,
75      * an {@link #END_MARKER} is added to the end of the queue.
76      */
77     private final BlockingQueue<Record> operations = new LinkedBlockingQueue<>();
78
79     /**
80      * Number of records that have been processed and committed into the DB by this data
81      * manager instance.
82      */
83     @Getter
84     private long recordsCommitted = 0;
85
86     /**
87      * Number of records that have been inserted into the DB by this data manager
88      * instance, whether or not they were committed.
89      */
90     @Getter
91     private long recordsInserted = 0;
92
93     /**
94      * Number of records that have been updated within the DB by this data manager
95      * instance, whether or not they were committed.
96      */
97     @Getter
98     private long recordsUpdated = 0;
99
100     /**
101      * Constructs the object.
102      *
103      * @param params data manager parameters
104      */
105     public OperationHistoryDataManagerImpl(OperationHistoryDataManagerParams params) {
106         ValidationResult result = params.validate("data-manager-properties");
107         if (!result.isValid()) {
108             throw new IllegalArgumentException(result.getResult());
109         }
110
111         this.maxQueueLength = params.getMaxQueueLength();
112         this.batchSize = params.getBatchSize();
113
114         // create the factory using the properties
115         var props = toProperties(params);
116         this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props);
117     }
118
119     @Override
120     public synchronized void start() {
121         if (stopped || thread != null) {
122             // already started
123             return;
124         }
125
126         logger.info("start operation history thread");
127
128         thread = makeThread(emFactory, this::run);
129         thread.setDaemon(true);
130         thread.start();
131     }
132
133     @Override
134     public synchronized void stop() {
135         logger.info("requesting stop of operation history thread");
136
137         stopped = true;
138
139         if (thread == null) {
140             // no thread to close the factory - do it here
141             emFactory.close();
142
143         } else {
144             // the thread will close the factory when it sees the end marker
145             operations.add(END_MARKER);
146         }
147     }
148
149     @Override
150     public synchronized void store(String requestId, String clName, Object event, String targetEntity,
151         ControlLoopOperation operation) {
152
153         if (stopped) {
154             logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId,
155                 event, operation);
156             return;
157         }
158
159         operations.add(new Record(requestId, clName, event, targetEntity, operation));
160
161         if (operations.size() > maxQueueLength) {
162             Record discarded = operations.remove();
163             logger.warn("too many items to store in the operation history table, discarding {}", discarded);
164         }
165     }
166
167     /**
168      * Takes records from {@link #operations} and stores them in the queue. Continues to
169      * run until {@link #stop()} is invoked, or the thread is interrupted.
170      *
171      * @param emfactory entity manager factory
172      */
173     private void run(EntityManagerFactory emfactory) {
174         try {
175             // store records until stopped, continuing if an exception occurs
176             while (!stopped) {
177                 try {
178                     Record triple = operations.take();
179                     storeBatch(emfactory.createEntityManager(), triple);
180
181                 } catch (RuntimeException e) {
182                     logger.error("failed to save data to operation history table", e);
183
184                 } catch (InterruptedException e) {
185                     logger.error("interrupted, discarding remaining operation history data", e);
186                     Thread.currentThread().interrupt();
187                     return;
188                 }
189             }
190
191             storeRemainingRecords(emfactory);
192
193         } finally {
194             synchronized (this) {
195                 stopped = true;
196             }
197
198             emfactory.close();
199         }
200     }
201
202     /**
203      * Store any remaining records, but stop at the first exception.
204      *
205      * @param emfactory entity manager factory
206      */
207     private void storeRemainingRecords(EntityManagerFactory emfactory) {
208         try {
209             while (!operations.isEmpty()) {
210                 storeBatch(emfactory.createEntityManager(), operations.poll());
211             }
212
213         } catch (RuntimeException e) {
214             logger.error("failed to save remaining data to operation history table", e);
215         }
216     }
217
218     /**
219      * Stores a batch of records.
220      *
221      * @param entityManager entity manager
222      * @param firstRecord first record to be stored
223      */
224     private void storeBatch(EntityManager entityManager, Record firstRecord) {
225         logger.info("store operation history record batch");
226
227         try (var emc = new EntityMgrCloser(entityManager);
228             var trans = new EntityTransCloser(entityManager.getTransaction())) {
229
230             var nrecords = 0;
231             var rec = firstRecord;
232
233             while (rec != null && rec != END_MARKER) {
234                 storeRecord(entityManager, rec);
235
236                 if (++nrecords >= batchSize) {
237                     break;
238                 }
239
240                 rec = operations.poll();
241             }
242
243             trans.commit();
244             recordsCommitted += nrecords;
245         }
246     }
247
248     /**
249      * Stores a record.
250      *
251      * @param entityMgr entity manager
252      * @param rec record to be stored
253      */
254     private void storeRecord(EntityManager entityMgr, Record rec) {
255
256         final String reqId = rec.getRequestId();
257         final String clName = rec.getClName();
258         final ControlLoopOperation operation = rec.getOperation();
259
260         logger.info("store operation history record for {}", reqId);
261
262         List<OperationsHistory> results = entityMgr
263             .createQuery("select e from OperationsHistory e" + " where e.closedLoopName= ?1"
264                 + " and e.requestId= ?2" + " and e.subrequestId= ?3" + " and e.actor= ?4"
265                 + " and e.operation= ?5" + " and e.target= ?6", OperationsHistory.class)
266             .setParameter(1, clName).setParameter(2, rec.getRequestId())
267             .setParameter(3, operation.getSubRequestId()).setParameter(4, operation.getActor())
268             .setParameter(5, operation.getOperation()).setParameter(6, rec.getTargetEntity())
269             .getResultList();
270
271         if (results.size() > 1) {
272             logger.warn("unexpected operation history record count {} for {}", results.size(), reqId);
273         }
274
275         OperationsHistory entry = (results.isEmpty() ? new OperationsHistory() : results.get(0));
276
277         entry.setClosedLoopName(clName);
278         entry.setRequestId(rec.getRequestId());
279         entry.setActor(operation.getActor());
280         entry.setOperation(operation.getOperation());
281         entry.setTarget(rec.getTargetEntity());
282         entry.setSubrequestId(operation.getSubRequestId());
283         entry.setMessage(operation.getMessage());
284         entry.setOutcome(operation.getOutcome());
285         if (operation.getStart() != null) {
286             entry.setStarttime(new Date(operation.getStart().toEpochMilli()));
287         } else {
288             entry.setStarttime(null);
289         }
290         if (operation.getEnd() != null) {
291             entry.setEndtime(new Date(operation.getEnd().toEpochMilli()));
292         } else {
293             entry.setEndtime(null);
294         }
295
296         if (results.isEmpty()) {
297             logger.info("insert operation history record for {}", reqId);
298             ++recordsInserted;
299             entityMgr.persist(entry);
300         } else {
301             logger.info("update operation history record for {}", reqId);
302             ++recordsUpdated;
303             entityMgr.merge(entry);
304         }
305     }
306
307     /**
308      * Converts the parameters to Properties.
309      *
310      * @param params parameters to be converted
311      * @return a new property set
312      */
313     private Properties toProperties(OperationHistoryDataManagerParams params) {
314         var props = new Properties();
315         props.put("javax.persistence.jdbc.driver",   params.getDriver());
316         props.put("javax.persistence.jdbc.url",      params.getUrl());
317         props.put("javax.persistence.jdbc.user",     params.getUserName());
318         props.put("javax.persistence.jdbc.password", params.getPassword());
319         props.put("hibernate.dialect",               params.getDbHibernateDialect());
320
321         return props;
322     }
323
324     @Getter
325     @NoArgsConstructor
326     @AllArgsConstructor
327     @ToString
328     private static class Record {
329         private String requestId;
330         private String clName;
331         private Object event;
332         private String targetEntity;
333         private ControlLoopOperation operation;
334     }
335
336     // the following may be overridden by junit tests
337
338     protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
339         return Persistence.createEntityManagerFactory(opsHistPu, props);
340     }
341
342     protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
343         return new Thread(() -> command.accept(emfactory));
344     }
345 }