8e8b1e5c764c237281fbf00a82e3873cd0d8f9e2
[policy/drools-applications.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.ophistory;
22
23 import java.util.Date;
24 import java.util.List;
25 import java.util.Properties;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.function.Consumer;
29 import javax.persistence.EntityManager;
30 import javax.persistence.EntityManagerFactory;
31 import javax.persistence.Persistence;
32 import lombok.AllArgsConstructor;
33 import lombok.Getter;
34 import lombok.NoArgsConstructor;
35 import lombok.ToString;
36 import org.eclipse.persistence.config.PersistenceUnitProperties;
37 import org.onap.policy.common.parameters.ValidationResult;
38 import org.onap.policy.common.utils.jpa.EntityMgrCloser;
39 import org.onap.policy.common.utils.jpa.EntityTransCloser;
40 import org.onap.policy.controlloop.ControlLoopOperation;
41 import org.onap.policy.guard.OperationsHistory;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Data manager that stores records in the DB, asynchronously, using a background thread.
47  */
48 public class OperationHistoryDataManagerImpl implements OperationHistoryDataManager {
49     private static final Logger logger = LoggerFactory.getLogger(OperationHistoryDataManagerImpl.class);
50
51     /**
52      * Added to the end of {@link #operations} when {@link #stop()} is called. This is
53      * used to get the background thread out of a blocking wait for the next record.
54      */
55     private static final Record END_MARKER = new Record();
56
57     // copied from the parameters
58     private final int maxQueueLength;
59     private final int batchSize;
60
61     private final EntityManagerFactory emFactory;
62
63     /**
64      * Thread that takes records from {@link #operations} and stores them in the DB.
65      */
66     private Thread thread;
67
68     /**
69      * Set to {@code true} to stop the background thread.
70      */
71     private boolean stopped = false;
72
73     /**
74      * Queue of operations waiting to be stored in the DB. When {@link #stop()} is called,
75      * an {@link #END_MARKER} is added to the end of the queue.
76      */
77     private final BlockingQueue<Record> operations = new LinkedBlockingQueue<>();
78
79     /**
80      * Number of records that have been processed and committed into the DB by this data
81      * manager instance.
82      */
83     @Getter
84     private long recordsCommitted = 0;
85
86     /**
87      * Number of records that have been inserted into the DB by this data manager
88      * instance, whether or not they were committed.
89      */
90     @Getter
91     private long recordsInserted = 0;
92
93     /**
94      * Number of records that have been updated within the DB by this data manager
95      * instance, whether or not they were committed.
96      */
97     @Getter
98     private long recordsUpdated = 0;
99
100
101     /**
102      * Constructs the object.
103      *
104      * @param params data manager parameters
105      */
106     public OperationHistoryDataManagerImpl(OperationHistoryDataManagerParams params) {
107         ValidationResult result = params.validate("data-manager-properties");
108         if (!result.isValid()) {
109             throw new IllegalArgumentException(result.getResult());
110         }
111
112         this.maxQueueLength = params.getMaxQueueLength();
113         this.batchSize = params.getBatchSize();
114
115         // create the factory using the properties
116         var props = toProperties(params);
117         this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props);
118     }
119
120     @Override
121     public synchronized void start() {
122         if (stopped || thread != null) {
123             // already started
124             return;
125         }
126
127         logger.info("start operation history thread");
128
129         thread = makeThread(emFactory, this::run);
130         thread.setDaemon(true);
131         thread.start();
132     }
133
134     @Override
135     public synchronized void stop() {
136         logger.info("requesting stop of operation history thread");
137
138         stopped = true;
139
140         if (thread == null) {
141             // no thread to close the factory - do it here
142             emFactory.close();
143
144         } else {
145             // the thread will close the factory when it sees the end marker
146             operations.add(END_MARKER);
147         }
148     }
149
150     @Override
151     public synchronized void store(String requestId, String clName, Object event, String targetEntity,
152                     ControlLoopOperation operation) {
153
154         if (stopped) {
155             logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId,
156                             event, operation);
157             return;
158         }
159
160         operations.add(new Record(requestId, clName, event, targetEntity, operation));
161
162         if (operations.size() > maxQueueLength) {
163             Record discarded = operations.remove();
164             logger.warn("too many items to store in the operation history table, discarding {}", discarded);
165         }
166     }
167
168     /**
169      * Takes records from {@link #operations} and stores them in the queue. Continues to
170      * run until {@link #stop()} is invoked, or the thread is interrupted.
171      *
172      * @param emfactory entity manager factory
173      */
174     private void run(EntityManagerFactory emfactory) {
175         try {
176             // store records until stopped, continuing if an exception occurs
177             while (!stopped) {
178                 try {
179                     Record triple = operations.take();
180                     storeBatch(emfactory.createEntityManager(), triple);
181
182                 } catch (RuntimeException e) {
183                     logger.error("failed to save data to operation history table", e);
184
185                 } catch (InterruptedException e) {
186                     logger.error("interrupted, discarding remaining operation history data", e);
187                     Thread.currentThread().interrupt();
188                     return;
189                 }
190             }
191
192             storeRemainingRecords(emfactory);
193
194         } finally {
195             synchronized (this) {
196                 stopped = true;
197             }
198
199             emfactory.close();
200         }
201     }
202
203     /**
204      * Store any remaining records, but stop at the first exception.
205      *
206      * @param emfactory entity manager factory
207      */
208     private void storeRemainingRecords(EntityManagerFactory emfactory) {
209         try {
210             while (!operations.isEmpty()) {
211                 storeBatch(emfactory.createEntityManager(), operations.poll());
212             }
213
214         } catch (RuntimeException e) {
215             logger.error("failed to save remaining data to operation history table", e);
216         }
217     }
218
219     /**
220      * Stores a batch of records.
221      *
222      * @param entityManager entity manager
223      * @param firstRecord first record to be stored
224      */
225     private void storeBatch(EntityManager entityManager, Record firstRecord) {
226         logger.info("store operation history record batch");
227
228         try (var emc = new EntityMgrCloser(entityManager);
229                         var trans = new EntityTransCloser(entityManager.getTransaction())) {
230
231             var nrecords = 0;
232             var rec = firstRecord;
233
234             while (rec != null && rec != END_MARKER) {
235                 storeRecord(entityManager, rec);
236
237                 if (++nrecords >= batchSize) {
238                     break;
239                 }
240
241                 rec = operations.poll();
242             }
243
244             trans.commit();
245             recordsCommitted += nrecords;
246         }
247     }
248
249     /**
250      * Stores a record.
251      *
252      * @param entityManager entity manager
253      * @param rec record to be stored
254      */
255     private void storeRecord(EntityManager entityMgr, Record rec) {
256
257         final String reqId = rec.getRequestId();
258         final String clName = rec.getClName();
259         final ControlLoopOperation operation = rec.getOperation();
260
261         logger.info("store operation history record for {}", reqId);
262
263         List<OperationsHistory> results = entityMgr
264                         .createQuery("select e from OperationsHistory e" + " where e.closedLoopName= ?1"
265                                         + " and e.requestId= ?2" + " and e.subrequestId= ?3" + " and e.actor= ?4"
266                                         + " and e.operation= ?5" + " and e.target= ?6", OperationsHistory.class)
267                         .setParameter(1, clName).setParameter(2, rec.getRequestId())
268                         .setParameter(3, operation.getSubRequestId()).setParameter(4, operation.getActor())
269                         .setParameter(5, operation.getOperation()).setParameter(6, rec.getTargetEntity())
270                         .getResultList();
271
272         if (results.size() > 1) {
273             logger.warn("unexpected operation history record count {} for {}", results.size(), reqId);
274         }
275
276         OperationsHistory entry = (results.isEmpty() ? new OperationsHistory() : results.get(0));
277
278         entry.setClosedLoopName(clName);
279         entry.setRequestId(rec.getRequestId());
280         entry.setActor(operation.getActor());
281         entry.setOperation(operation.getOperation());
282         entry.setTarget(rec.getTargetEntity());
283         entry.setSubrequestId(operation.getSubRequestId());
284         entry.setMessage(operation.getMessage());
285         entry.setOutcome(operation.getOutcome());
286         if (operation.getStart() != null) {
287             entry.setStarttime(new Date(operation.getStart().toEpochMilli()));
288         } else {
289             entry.setStarttime(null);
290         }
291         if (operation.getEnd() != null) {
292             entry.setEndtime(new Date(operation.getEnd().toEpochMilli()));
293         } else {
294             entry.setEndtime(null);
295         }
296
297         if (results.isEmpty()) {
298             logger.info("insert operation history record for {}", reqId);
299             ++recordsInserted;
300             entityMgr.persist(entry);
301         } else {
302             logger.info("update operation history record for {}", reqId);
303             ++recordsUpdated;
304             entityMgr.merge(entry);
305         }
306     }
307
308     /**
309      * Converts the parameters to Properties.
310      *
311      * @param params parameters to be converted
312      * @return a new property set
313      */
314     private Properties toProperties(OperationHistoryDataManagerParams params) {
315         var props = new Properties();
316         props.put(PersistenceUnitProperties.JDBC_DRIVER, params.getDriver());
317         props.put(PersistenceUnitProperties.JDBC_URL, params.getUrl());
318         props.put(PersistenceUnitProperties.JDBC_USER, params.getUserName());
319         props.put(PersistenceUnitProperties.JDBC_PASSWORD, params.getPassword());
320         props.put(PersistenceUnitProperties.TARGET_DATABASE, params.getDbType());
321         props.put(PersistenceUnitProperties.CLASSLOADER, getClass().getClassLoader());
322
323         return props;
324     }
325
326     @Getter
327     @NoArgsConstructor
328     @AllArgsConstructor
329     @ToString
330     private static class Record {
331         private String requestId;
332         private String clName;
333         private Object event;
334         private String targetEntity;
335         private ControlLoopOperation operation;
336     }
337
338     // the following may be overridden by junit tests
339
340     protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
341         return Persistence.createEntityManagerFactory(opsHistPu, props);
342     }
343
344     protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
345         return new Thread(() -> command.accept(emfactory));
346     }
347 }