f2feef6665cd7a1ed01f2143b08d6d5cbb8a86a9
[policy/drools-applications.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.ophistory;
22
23 import java.util.Date;
24 import java.util.List;
25 import java.util.Properties;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.function.Consumer;
29 import javax.persistence.EntityManager;
30 import javax.persistence.EntityManagerFactory;
31 import javax.persistence.Persistence;
32 import lombok.AllArgsConstructor;
33 import lombok.Getter;
34 import lombok.NoArgsConstructor;
35 import lombok.ToString;
36 import org.eclipse.persistence.config.PersistenceUnitProperties;
37 import org.onap.policy.common.parameters.ValidationResult;
38 import org.onap.policy.common.utils.jpa.EntityMgrCloser;
39 import org.onap.policy.common.utils.jpa.EntityTransCloser;
40 import org.onap.policy.controlloop.ControlLoopOperation;
41 import org.onap.policy.controlloop.VirtualControlLoopEvent;
42 import org.onap.policy.guard.OperationsHistory;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Data manager that stores records in the DB, asynchronously, using a background thread.
48  */
49 public class OperationHistoryDataManagerImpl implements OperationHistoryDataManager {
50     private static final Logger logger = LoggerFactory.getLogger(OperationHistoryDataManagerImpl.class);
51
52     /**
53      * Added to the end of {@link #operations} when {@link #stop()} is called. This is
54      * used to get the background thread out of a blocking wait for the next record.
55      */
56     private static final Record END_MARKER = new Record();
57
58     // copied from the parameters
59     private final int maxQueueLength;
60     private final int batchSize;
61
62     private final EntityManagerFactory emFactory;
63
64     /**
65      * Thread that takes records from {@link #operations} and stores them in the DB.
66      */
67     private Thread thread;
68
69     /**
70      * Set to {@code true} to stop the background thread.
71      */
72     private boolean stopped = false;
73
74     /**
75      * Queue of operations waiting to be stored in the DB. When {@link #stop()} is called,
76      * an {@link #END_MARKER} is added to the end of the queue.
77      */
78     private final BlockingQueue<Record> operations = new LinkedBlockingQueue<>();
79
80     /**
81      * Number of records that have been processed and committed into the DB by this data
82      * manager instance.
83      */
84     @Getter
85     private long recordsCommitted = 0;
86
87     /**
88      * Number of records that have been inserted into the DB by this data manager
89      * instance, whether or not they were committed.
90      */
91     @Getter
92     private long recordsInserted = 0;
93
94     /**
95      * Number of records that have been updated within the DB by this data manager
96      * instance, whether or not they were committed.
97      */
98     @Getter
99     private long recordsUpdated = 0;
100
101
102     /**
103      * Constructs the object.
104      *
105      * @param params data manager parameters
106      */
107     public OperationHistoryDataManagerImpl(OperationHistoryDataManagerParams params) {
108         ValidationResult result = params.validate("data-manager-properties");
109         if (!result.isValid()) {
110             throw new IllegalArgumentException(result.getResult());
111         }
112
113         this.maxQueueLength = params.getMaxQueueLength();
114         this.batchSize = params.getBatchSize();
115
116         // create the factory using the properties
117         Properties props = toProperties(params);
118         this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props);
119     }
120
121     @Override
122     public synchronized void start() {
123         if (stopped || thread != null) {
124             // already started
125             return;
126         }
127
128         logger.info("start operation history thread");
129
130         thread = makeThread(emFactory, this::run);
131         thread.setDaemon(true);
132         thread.start();
133     }
134
135     @Override
136     public synchronized void stop() {
137         logger.info("requesting stop of operation history thread");
138
139         stopped = true;
140
141         if (thread == null) {
142             // no thread to close the factory - do it here
143             emFactory.close();
144
145         } else {
146             // the thread will close the factory when it sees the end marker
147             operations.add(END_MARKER);
148         }
149     }
150
151     @Override
152     public synchronized void store(String requestId, VirtualControlLoopEvent event, String targetEntity,
153                     ControlLoopOperation operation) {
154
155         if (stopped) {
156             logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId,
157                             event, operation);
158             return;
159         }
160
161         operations.add(new Record(requestId, event, targetEntity, operation));
162
163         if (operations.size() > maxQueueLength) {
164             Record discarded = operations.remove();
165             logger.warn("too many items to store in the operation history table, discarding {}", discarded);
166         }
167     }
168
169     /**
170      * Takes records from {@link #operations} and stores them in the queue. Continues to
171      * run until {@link #stop()} is invoked, or the thread is interrupted.
172      *
173      * @param emfactory entity manager factory
174      */
175     private void run(EntityManagerFactory emfactory) {
176         try {
177             // store records until stopped, continuing if an exception occurs
178             while (!stopped) {
179                 try {
180                     Record triple = operations.take();
181                     storeBatch(emfactory.createEntityManager(), triple);
182
183                 } catch (RuntimeException e) {
184                     logger.error("failed to save data to operation history table", e);
185
186                 } catch (InterruptedException e) {
187                     logger.error("interrupted, discarding remaining operation history data", e);
188                     Thread.currentThread().interrupt();
189                     return;
190                 }
191             }
192
193             storeRemainingRecords(emfactory);
194
195         } finally {
196             synchronized (this) {
197                 stopped = true;
198             }
199
200             emfactory.close();
201         }
202     }
203
204     /**
205      * Store any remaining records, but stop at the first exception.
206      *
207      * @param emfactory entity manager factory
208      */
209     private void storeRemainingRecords(EntityManagerFactory emfactory) {
210         try {
211             while (!operations.isEmpty()) {
212                 storeBatch(emfactory.createEntityManager(), operations.poll());
213             }
214
215         } catch (RuntimeException e) {
216             logger.error("failed to save remaining data to operation history table", e);
217         }
218     }
219
220     /**
221      * Stores a batch of records.
222      *
223      * @param entityManager entity manager
224      * @param firstRecord first record to be stored
225      */
226     private void storeBatch(EntityManager entityManager, Record firstRecord) {
227         logger.info("store operation history record batch");
228
229         try (EntityMgrCloser emc = new EntityMgrCloser(entityManager);
230                         EntityTransCloser trans = new EntityTransCloser(entityManager.getTransaction())) {
231
232             int nrecords = 0;
233             Record record = firstRecord;
234
235             while (record != null && record != END_MARKER) {
236                 storeRecord(entityManager, record);
237
238                 if (++nrecords >= batchSize) {
239                     break;
240                 }
241
242                 record = operations.poll();
243             }
244
245             trans.commit();
246             recordsCommitted += nrecords;
247         }
248     }
249
250     /**
251      * Stores a record.
252      *
253      * @param entityManager entity manager
254      * @param record record to be stored
255      */
256     private void storeRecord(EntityManager entityMgr, Record record) {
257
258         final VirtualControlLoopEvent event = record.getEvent();
259         final ControlLoopOperation operation = record.getOperation();
260
261         logger.info("store operation history record for {}", event.getRequestId());
262
263         List<OperationsHistory> results =
264             entityMgr.createQuery("select e from OperationsHistory e"
265                         + " where e.closedLoopName= ?1"
266                         + " and e.requestId= ?2"
267                         + " and e.subrequestId= ?3"
268                         + " and e.actor= ?4"
269                         + " and e.operation= ?5"
270                         + " and e.target= ?6",
271                         OperationsHistory.class)
272                 .setParameter(1, event.getClosedLoopControlName())
273                 .setParameter(2, record.getRequestId())
274                 .setParameter(3, operation.getSubRequestId())
275                 .setParameter(4, operation.getActor())
276                 .setParameter(5, operation.getOperation())
277                 .setParameter(6, record.getTargetEntity())
278                 .getResultList();
279
280         if (results.size() > 1) {
281             logger.warn("unexpected operation history record count {} for {}", results.size(), event.getRequestId());
282         }
283
284         OperationsHistory entry = (results.isEmpty() ? new OperationsHistory() : results.get(0));
285
286         entry.setClosedLoopName(event.getClosedLoopControlName());
287         entry.setRequestId(record.getRequestId());
288         entry.setActor(operation.getActor());
289         entry.setOperation(operation.getOperation());
290         entry.setTarget(record.getTargetEntity());
291         entry.setSubrequestId(operation.getSubRequestId());
292         entry.setMessage(operation.getMessage());
293         entry.setOutcome(operation.getOutcome());
294         if (operation.getStart() != null) {
295             entry.setStarttime(new Date(operation.getStart().toEpochMilli()));
296         } else {
297             entry.setStarttime(null);
298         }
299         if (operation.getEnd() != null) {
300             entry.setEndtime(new Date(operation.getEnd().toEpochMilli()));
301         } else {
302             entry.setEndtime(null);
303         }
304
305         if (results.isEmpty()) {
306             logger.info("insert operation history record for {}", event.getRequestId());
307             ++recordsInserted;
308             entityMgr.persist(entry);
309         } else {
310             logger.info("update operation history record for {}", event.getRequestId());
311             ++recordsUpdated;
312             entityMgr.merge(entry);
313         }
314     }
315
316     /**
317      * Converts the parameters to Properties.
318      *
319      * @param params parameters to be converted
320      * @return a new property set
321      */
322     private Properties toProperties(OperationHistoryDataManagerParams params) {
323         Properties props = new Properties();
324         props.put(PersistenceUnitProperties.JDBC_URL, params.getUrl());
325         props.put(PersistenceUnitProperties.JDBC_USER, params.getUserName());
326         props.put(PersistenceUnitProperties.JDBC_PASSWORD, params.getPassword());
327         props.put(PersistenceUnitProperties.CLASSLOADER, getClass().getClassLoader());
328
329         return props;
330     }
331
332     @Getter
333     @NoArgsConstructor
334     @AllArgsConstructor
335     @ToString
336     private static class Record {
337         private String requestId;
338         private VirtualControlLoopEvent event;
339         private String targetEntity;
340         private ControlLoopOperation operation;
341     }
342
343     // the following may be overridden by junit tests
344
345     protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
346         return Persistence.createEntityManagerFactory(opsHistPu, props);
347     }
348
349     protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
350         return new Thread(() -> command.accept(emfactory));
351     }
352 }