5de38a4c39e950380d1d0bfbd6e689a848f094d4
[policy/drools-applications.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.ophistory;
22
23 import java.util.Date;
24 import java.util.Properties;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.function.Consumer;
28 import javax.persistence.EntityManager;
29 import javax.persistence.EntityManagerFactory;
30 import javax.persistence.Persistence;
31 import lombok.AllArgsConstructor;
32 import lombok.Getter;
33 import lombok.NoArgsConstructor;
34 import lombok.ToString;
35 import org.eclipse.persistence.config.PersistenceUnitProperties;
36 import org.onap.policy.common.parameters.ValidationResult;
37 import org.onap.policy.common.utils.jpa.EntityMgrCloser;
38 import org.onap.policy.common.utils.jpa.EntityTransCloser;
39 import org.onap.policy.controlloop.ControlLoopOperation;
40 import org.onap.policy.controlloop.VirtualControlLoopEvent;
41 import org.onap.policy.database.operationshistory.Dbao;
42 import org.onap.policy.guard.Util;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Data manager that stores records in the DB, asynchronously, using a background thread.
48  */
49 public class OperationHistoryDataManagerImpl implements OperationHistoryDataManager {
50     private static final Logger logger = LoggerFactory.getLogger(OperationHistoryDataManagerImpl.class);
51
52     /**
53      * Added to the end of {@link #operations} when {@link #stop()} is called. This is
54      * used to get the background thread out of a blocking wait for the next record.
55      */
56     private static final Record END_MARKER = new Record();
57
58     // copied from the parameters
59     private final int maxQueueLength;
60     private final int batchSize;
61
62     private final EntityManagerFactory emFactory;
63
64     /**
65      * Thread that takes records from {@link #operations} and stores them in the DB.
66      */
67     private Thread thread;
68
69     /**
70      * Set to {@code true} to stop the background thread.
71      */
72     private boolean stopped = false;
73
74     /**
75      * Queue of operations waiting to be stored in the DB. When {@link #stop()} is called,
76      * an {@link #END_MARKER} is added to the end of the queue.
77      */
78     private final BlockingQueue<Record> operations = new LinkedBlockingQueue<>();
79
80     /**
81      * Number of records that have been added to the DB by this data manager instance.
82      */
83     @Getter
84     private long recordsAdded = 0;
85
86
87     /**
88      * Constructs the object.
89      *
90      * @param params data manager parameters
91      */
92     public OperationHistoryDataManagerImpl(OperationHistoryDataManagerParams params) {
93         ValidationResult result = params.validate("data-manager-properties");
94         if (!result.isValid()) {
95             throw new IllegalArgumentException(result.getResult());
96         }
97
98         this.maxQueueLength = params.getMaxQueueLength();
99         this.batchSize = params.getBatchSize();
100
101         // create the factory using the properties
102         Properties props = toProperties(params);
103         this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props);
104     }
105
106     @Override
107     public synchronized void start() {
108         if (stopped || thread != null) {
109             // already started
110             return;
111         }
112
113         logger.info("start operation history thread");
114
115         thread = makeThread(emFactory, this::run);
116         thread.setDaemon(true);
117         thread.start();
118     }
119
120     @Override
121     public synchronized void stop() {
122         logger.info("requesting stop of operation history thread");
123
124         stopped = true;
125
126         if (thread == null) {
127             // no thread to close the factory - do it here
128             emFactory.close();
129
130         } else {
131             // the thread will close the factory when it sees the end marker
132             operations.add(END_MARKER);
133         }
134     }
135
136     @Override
137     public synchronized void store(String requestId, VirtualControlLoopEvent event, String targetEntity,
138                     ControlLoopOperation operation) {
139
140         if (stopped) {
141             logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId,
142                             event, operation);
143             return;
144         }
145
146         operations.add(new Record(requestId, event, targetEntity, operation));
147
148         if (operations.size() > maxQueueLength) {
149             Record discarded = operations.remove();
150             logger.warn("too many items to store in the operation history table, discarding {}", discarded);
151         }
152     }
153
154     /**
155      * Takes records from {@link #operations} and stores them in the queue. Continues to
156      * run until {@link #stop()} is invoked, or the thread is interrupted.
157      *
158      * @param emfactory entity manager factory
159      */
160     private void run(EntityManagerFactory emfactory) {
161         try {
162             // store records until stopped, continuing if an exception occurs
163             while (!stopped) {
164                 try {
165                     Record triple = operations.take();
166                     storeBatch(emfactory.createEntityManager(), triple);
167
168                 } catch (RuntimeException e) {
169                     logger.error("failed to save data to operation history table", e);
170
171                 } catch (InterruptedException e) {
172                     logger.error("interrupted, discarding remaining operation history data", e);
173                     Thread.currentThread().interrupt();
174                     return;
175                 }
176             }
177
178             storeRemainingRecords(emfactory);
179
180         } finally {
181             synchronized (this) {
182                 stopped = true;
183             }
184
185             emfactory.close();
186         }
187     }
188
189     /**
190      * Store any remaining records, but stop at the first exception.
191      *
192      * @param emfactory entity manager factory
193      */
194     private void storeRemainingRecords(EntityManagerFactory emfactory) {
195         try {
196             while (!operations.isEmpty()) {
197                 storeBatch(emfactory.createEntityManager(), operations.poll());
198             }
199
200         } catch (RuntimeException e) {
201             logger.error("failed to save remaining data to operation history table", e);
202         }
203     }
204
205     /**
206      * Stores a batch of records.
207      *
208      * @param entityManager entity manager
209      * @param firstRecord first record to be stored
210      */
211     private void storeBatch(EntityManager entityManager, Record firstRecord) {
212         logger.info("store operation history record batch");
213
214         try (EntityMgrCloser emc = new EntityMgrCloser(entityManager);
215                         EntityTransCloser trans = new EntityTransCloser(entityManager.getTransaction())) {
216
217             int nrecords = 0;
218             Record record = firstRecord;
219
220             while (record != null && record != END_MARKER) {
221                 storeRecord(entityManager, record);
222
223                 if (++nrecords >= batchSize) {
224                     break;
225                 }
226
227                 record = operations.poll();
228             }
229
230             trans.commit();
231             recordsAdded += nrecords;
232         }
233     }
234
235     /**
236      * Stores a record.
237      *
238      * @param entityManager entity manager
239      * @param record record to be stored
240      */
241     private void storeRecord(EntityManager entityMgr, Record record) {
242         Dbao newEntry = new Dbao();
243
244         final VirtualControlLoopEvent event = record.getEvent();
245         final ControlLoopOperation operation = record.getOperation();
246
247         logger.info("store operation history record for {}", event.getRequestId());
248
249         newEntry.setClosedLoopName(event.getClosedLoopControlName());
250         newEntry.setRequestId(record.getRequestId());
251         newEntry.setActor(operation.getActor());
252         newEntry.setOperation(operation.getOperation());
253         newEntry.setTarget(record.getTargetEntity());
254         newEntry.setSubrequestId(operation.getSubRequestId());
255         newEntry.setMessage(operation.getMessage());
256         newEntry.setOutcome(operation.getOutcome());
257         if (operation.getStart() != null) {
258             newEntry.setStarttime(new Date(operation.getStart().toEpochMilli()));
259         }
260         if (operation.getEnd() != null) {
261             newEntry.setEndtime(new Date(operation.getEnd().toEpochMilli()));
262         }
263
264         entityMgr.persist(newEntry);
265     }
266
267     /**
268      * Converts the parameters to Properties.
269      *
270      * @param params parameters to be converted
271      * @return a new property set
272      */
273     private Properties toProperties(OperationHistoryDataManagerParams params) {
274         Properties props = new Properties();
275         props.put(Util.ECLIPSE_LINK_KEY_URL, params.getUrl());
276         props.put(Util.ECLIPSE_LINK_KEY_USER, params.getUserName());
277         props.put(Util.ECLIPSE_LINK_KEY_PASS, params.getPassword());
278         props.put(PersistenceUnitProperties.CLASSLOADER, getClass().getClassLoader());
279
280         return props;
281     }
282
283     @Getter
284     @NoArgsConstructor
285     @AllArgsConstructor
286     @ToString
287     private static class Record {
288         private String requestId;
289         private VirtualControlLoopEvent event;
290         private String targetEntity;
291         private ControlLoopOperation operation;
292     }
293
294     // the following may be overridden by junit tests
295
296     protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
297         return Persistence.createEntityManagerFactory(opsHistPu, props);
298     }
299
300     protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
301         return new Thread(() -> command.accept(emfactory));
302     }
303 }