Use OperationsHistory from models
[policy/drools-applications.git] / controlloop / common / eventmanager / src / main / java / org / onap / policy / controlloop / ophistory / OperationHistoryDataManagerImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.ophistory;
22
23 import java.util.Date;
24 import java.util.List;
25 import java.util.Properties;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.function.Consumer;
29 import javax.persistence.EntityManager;
30 import javax.persistence.EntityManagerFactory;
31 import javax.persistence.Persistence;
32 import lombok.AllArgsConstructor;
33 import lombok.Getter;
34 import lombok.NoArgsConstructor;
35 import lombok.ToString;
36 import org.eclipse.persistence.config.PersistenceUnitProperties;
37 import org.onap.policy.common.parameters.ValidationResult;
38 import org.onap.policy.common.utils.jpa.EntityMgrCloser;
39 import org.onap.policy.common.utils.jpa.EntityTransCloser;
40 import org.onap.policy.controlloop.ControlLoopOperation;
41 import org.onap.policy.controlloop.VirtualControlLoopEvent;
42 import org.onap.policy.guard.OperationsHistory;
43 import org.onap.policy.guard.Util;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * Data manager that stores records in the DB, asynchronously, using a background thread.
49  */
50 public class OperationHistoryDataManagerImpl implements OperationHistoryDataManager {
51     private static final Logger logger = LoggerFactory.getLogger(OperationHistoryDataManagerImpl.class);
52
53     /**
54      * Added to the end of {@link #operations} when {@link #stop()} is called. This is
55      * used to get the background thread out of a blocking wait for the next record.
56      */
57     private static final Record END_MARKER = new Record();
58
59     // copied from the parameters
60     private final int maxQueueLength;
61     private final int batchSize;
62
63     private final EntityManagerFactory emFactory;
64
65     /**
66      * Thread that takes records from {@link #operations} and stores them in the DB.
67      */
68     private Thread thread;
69
70     /**
71      * Set to {@code true} to stop the background thread.
72      */
73     private boolean stopped = false;
74
75     /**
76      * Queue of operations waiting to be stored in the DB. When {@link #stop()} is called,
77      * an {@link #END_MARKER} is added to the end of the queue.
78      */
79     private final BlockingQueue<Record> operations = new LinkedBlockingQueue<>();
80
81     /**
82      * Number of records that have been processed and committed into the DB by this data
83      * manager instance.
84      */
85     @Getter
86     private long recordsCommitted = 0;
87
88     /**
89      * Number of records that have been inserted into the DB by this data manager
90      * instance, whether or not they were committed.
91      */
92     @Getter
93     private long recordsInserted = 0;
94
95     /**
96      * Number of records that have been updated within the DB by this data manager
97      * instance, whether or not they were committed.
98      */
99     @Getter
100     private long recordsUpdated = 0;
101
102
103     /**
104      * Constructs the object.
105      *
106      * @param params data manager parameters
107      */
108     public OperationHistoryDataManagerImpl(OperationHistoryDataManagerParams params) {
109         ValidationResult result = params.validate("data-manager-properties");
110         if (!result.isValid()) {
111             throw new IllegalArgumentException(result.getResult());
112         }
113
114         this.maxQueueLength = params.getMaxQueueLength();
115         this.batchSize = params.getBatchSize();
116
117         // create the factory using the properties
118         Properties props = toProperties(params);
119         this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props);
120     }
121
122     @Override
123     public synchronized void start() {
124         if (stopped || thread != null) {
125             // already started
126             return;
127         }
128
129         logger.info("start operation history thread");
130
131         thread = makeThread(emFactory, this::run);
132         thread.setDaemon(true);
133         thread.start();
134     }
135
136     @Override
137     public synchronized void stop() {
138         logger.info("requesting stop of operation history thread");
139
140         stopped = true;
141
142         if (thread == null) {
143             // no thread to close the factory - do it here
144             emFactory.close();
145
146         } else {
147             // the thread will close the factory when it sees the end marker
148             operations.add(END_MARKER);
149         }
150     }
151
152     @Override
153     public synchronized void store(String requestId, VirtualControlLoopEvent event, String targetEntity,
154                     ControlLoopOperation operation) {
155
156         if (stopped) {
157             logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId,
158                             event, operation);
159             return;
160         }
161
162         operations.add(new Record(requestId, event, targetEntity, operation));
163
164         if (operations.size() > maxQueueLength) {
165             Record discarded = operations.remove();
166             logger.warn("too many items to store in the operation history table, discarding {}", discarded);
167         }
168     }
169
170     /**
171      * Takes records from {@link #operations} and stores them in the queue. Continues to
172      * run until {@link #stop()} is invoked, or the thread is interrupted.
173      *
174      * @param emfactory entity manager factory
175      */
176     private void run(EntityManagerFactory emfactory) {
177         try {
178             // store records until stopped, continuing if an exception occurs
179             while (!stopped) {
180                 try {
181                     Record triple = operations.take();
182                     storeBatch(emfactory.createEntityManager(), triple);
183
184                 } catch (RuntimeException e) {
185                     logger.error("failed to save data to operation history table", e);
186
187                 } catch (InterruptedException e) {
188                     logger.error("interrupted, discarding remaining operation history data", e);
189                     Thread.currentThread().interrupt();
190                     return;
191                 }
192             }
193
194             storeRemainingRecords(emfactory);
195
196         } finally {
197             synchronized (this) {
198                 stopped = true;
199             }
200
201             emfactory.close();
202         }
203     }
204
205     /**
206      * Store any remaining records, but stop at the first exception.
207      *
208      * @param emfactory entity manager factory
209      */
210     private void storeRemainingRecords(EntityManagerFactory emfactory) {
211         try {
212             while (!operations.isEmpty()) {
213                 storeBatch(emfactory.createEntityManager(), operations.poll());
214             }
215
216         } catch (RuntimeException e) {
217             logger.error("failed to save remaining data to operation history table", e);
218         }
219     }
220
221     /**
222      * Stores a batch of records.
223      *
224      * @param entityManager entity manager
225      * @param firstRecord first record to be stored
226      */
227     private void storeBatch(EntityManager entityManager, Record firstRecord) {
228         logger.info("store operation history record batch");
229
230         try (EntityMgrCloser emc = new EntityMgrCloser(entityManager);
231                         EntityTransCloser trans = new EntityTransCloser(entityManager.getTransaction())) {
232
233             int nrecords = 0;
234             Record record = firstRecord;
235
236             while (record != null && record != END_MARKER) {
237                 storeRecord(entityManager, record);
238
239                 if (++nrecords >= batchSize) {
240                     break;
241                 }
242
243                 record = operations.poll();
244             }
245
246             trans.commit();
247             recordsCommitted += nrecords;
248         }
249     }
250
251     /**
252      * Stores a record.
253      *
254      * @param entityManager entity manager
255      * @param record record to be stored
256      */
257     private void storeRecord(EntityManager entityMgr, Record record) {
258
259         final VirtualControlLoopEvent event = record.getEvent();
260         final ControlLoopOperation operation = record.getOperation();
261
262         logger.info("store operation history record for {}", event.getRequestId());
263
264         List<OperationsHistory> results =
265             entityMgr.createQuery("select e from OperationsHistory e"
266                         + " where e.closedLoopName= ?1"
267                         + " and e.requestId= ?2"
268                         + " and e.subrequestId= ?3"
269                         + " and e.actor= ?4"
270                         + " and e.operation= ?5"
271                         + " and e.target= ?6",
272                         OperationsHistory.class)
273                 .setParameter(1, event.getClosedLoopControlName())
274                 .setParameter(2, record.getRequestId())
275                 .setParameter(3, operation.getSubRequestId())
276                 .setParameter(4, operation.getActor())
277                 .setParameter(5, operation.getOperation())
278                 .setParameter(6, record.getTargetEntity())
279                 .getResultList();
280
281         if (results.size() > 1) {
282             logger.warn("unexpected operation history record count {} for {}", results.size(), event.getRequestId());
283         }
284
285         OperationsHistory entry = (results.isEmpty() ? new OperationsHistory() : results.get(0));
286
287         entry.setClosedLoopName(event.getClosedLoopControlName());
288         entry.setRequestId(record.getRequestId());
289         entry.setActor(operation.getActor());
290         entry.setOperation(operation.getOperation());
291         entry.setTarget(record.getTargetEntity());
292         entry.setSubrequestId(operation.getSubRequestId());
293         entry.setMessage(operation.getMessage());
294         entry.setOutcome(operation.getOutcome());
295         if (operation.getStart() != null) {
296             entry.setStarttime(new Date(operation.getStart().toEpochMilli()));
297         } else {
298             entry.setStarttime(null);
299         }
300         if (operation.getEnd() != null) {
301             entry.setEndtime(new Date(operation.getEnd().toEpochMilli()));
302         } else {
303             entry.setEndtime(null);
304         }
305
306         if (results.isEmpty()) {
307             logger.info("insert operation history record for {}", event.getRequestId());
308             ++recordsInserted;
309             entityMgr.persist(entry);
310         } else {
311             logger.info("update operation history record for {}", event.getRequestId());
312             ++recordsUpdated;
313             entityMgr.merge(entry);
314         }
315     }
316
317     /**
318      * Converts the parameters to Properties.
319      *
320      * @param params parameters to be converted
321      * @return a new property set
322      */
323     private Properties toProperties(OperationHistoryDataManagerParams params) {
324         Properties props = new Properties();
325         props.put(Util.ECLIPSE_LINK_KEY_URL, params.getUrl());
326         props.put(Util.ECLIPSE_LINK_KEY_USER, params.getUserName());
327         props.put(Util.ECLIPSE_LINK_KEY_PASS, params.getPassword());
328         props.put(PersistenceUnitProperties.CLASSLOADER, getClass().getClassLoader());
329
330         return props;
331     }
332
333     @Getter
334     @NoArgsConstructor
335     @AllArgsConstructor
336     @ToString
337     private static class Record {
338         private String requestId;
339         private VirtualControlLoopEvent event;
340         private String targetEntity;
341         private ControlLoopOperation operation;
342     }
343
344     // the following may be overridden by junit tests
345
346     protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
347         return Persistence.createEntityManagerFactory(opsHistPu, props);
348     }
349
350     protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
351         return new Thread(() -> command.accept(emfactory));
352     }
353 }