0ec85077d4f7898a6c540d09f31bc7226d52d956
[music.git] / src / main / java / org / onap / music / lockingservice / cassandra / CassaLockStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  *  Modifications Copyright (C) 2019 IBM.
7  * ===================================================================
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *
20  * ============LICENSE_END=============================================
21  * ====================================================================
22  */
23
24 package org.onap.music.lockingservice.cassandra;
25
26 import java.util.ArrayList;
27 import java.util.Iterator;
28 import java.util.List;
29
30 import org.onap.music.datastore.MusicDataStore;
31 import org.onap.music.datastore.PreparedQueryObject;
32 import org.onap.music.eelf.logging.EELFLoggerDelegate;
33 import org.onap.music.exceptions.MusicLockingException;
34 import org.onap.music.exceptions.MusicQueryException;
35 import org.onap.music.exceptions.MusicServiceException;
36 import org.onap.music.main.DeadlockDetectionUtil;
37 import org.onap.music.main.DeadlockDetectionUtil.OwnershipType;
38 import org.onap.music.main.MusicUtil;
39
40 import com.datastax.driver.core.ResultSet;
41 import com.datastax.driver.core.Row;
42
43 /*
44  * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state. 
45  */
46
47 public class CassaLockStore {
48     
49     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
50     private static String table_prepend_name = "lockQ_";
51     private MusicDataStore dsHandle;
52     
53     public CassaLockStore() {
54         dsHandle = new MusicDataStore();
55     }
56     
57     public CassaLockStore(MusicDataStore dsHandle) {
58         this.dsHandle=dsHandle;
59     }
60     public class LockObject{
61         private boolean isLockOwner;
62         private String lockRef;
63         private String createTime;
64         private String acquireTime;
65         private LockType locktype;
66         // Owner is the self-declared client which "owns" this row. It is used for deadlock detection.  It is not (directly) related to isLockOwner.
67         private String owner;
68         public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype, String owner) {
69             this.setIsLockOwner(isLockOwner);
70             this.setLockRef(lockRef);
71             this.setAcquireTime(acquireTime);
72             this.setCreateTime(createTime);
73             this.setLocktype(locktype);
74             this.setOwner(owner);
75         }
76         public boolean getIsLockOwner() {
77             return isLockOwner;
78         }
79         public void setIsLockOwner(boolean isLockOwner) {
80             this.isLockOwner = isLockOwner;
81         }
82         public String getAcquireTime() {
83             return acquireTime;
84         }
85         public void setAcquireTime(String acquireTime) {
86             this.acquireTime = acquireTime;
87         }
88         public String getCreateTime() {
89             return createTime;
90         }
91         public void setCreateTime(String createTime) {
92             this.createTime = createTime;
93         }
94         public String getLockRef() {
95             return lockRef;
96         }
97         public void setLockRef(String lockRef) {
98             this.lockRef = lockRef;
99         }
100         public LockType getLocktype() {
101             return locktype;
102         }
103         public void setLocktype(LockType locktype) {
104             this.locktype = locktype;
105         }
106         public String getOwner() {
107             return owner;
108         }
109         public void setOwner(String owner) {
110             this.owner = owner;
111         }
112     }
113     
114     /**
115      * 
116      * This method creates a shadow locking table for every main table in Cassandra. This table tracks all information regarding locks. 
117      * @param keyspace of the application. 
118      * @param table of the application. 
119      * @return true if the operation was successful.
120      * @throws MusicServiceException
121      * @throws MusicQueryException
122      */
123     public boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
124         logger.info(EELFLoggerDelegate.applicationLogger,
125                 "Create lock queue/table for " +  keyspace+"."+table);
126         table = table_prepend_name+table;
127         String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
128                 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, "
129                 + "writeLock boolean, owner text, PRIMARY KEY ((key), lockReference) ) "
130                 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
131         PreparedQueryObject queryObject = new PreparedQueryObject();
132         
133         queryObject.appendQueryString(tabQuery);
134         boolean result;
135         result = dsHandle.executePut(queryObject, "eventual");
136         return result;
137     }
138
139     /**
140      * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps.
141      * @param keyspace of the locks.
142      * @param table of the locks.
143      * @param lockName is the primary key of the lock table
144      * @param lockType is the type of lock (read/write)
145      * @param owner is the owner of the lock (optional, for deadlock detection)
146      * @return the UUID lock reference.
147      * @throws MusicServiceException
148      * @throws MusicQueryException
149      */
150     public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner) throws MusicServiceException, MusicQueryException, MusicLockingException {
151         return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, 0);
152     }
153     
154     private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
155         logger.info(EELFLoggerDelegate.applicationLogger,
156                 "Create " + locktype + " lock reference for " +  keyspace + "." + table + "." + lockName);
157         String lockTable ="";
158         lockTable = table_prepend_name+table;
159     
160         PreparedQueryObject queryObject = new PreparedQueryObject();
161         String selectQuery = "SELECT guard FROM " + keyspace + "." + lockTable + " WHERE key=?;";
162
163         queryObject.addValue(lockName);
164         queryObject.appendQueryString(selectQuery);
165         ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject);
166         List<Row> latestGuardRow = gqResult.all();
167
168         long prevGuard = 0;
169         long lockRef = 1;
170         if (!latestGuardRow.isEmpty()) {
171             prevGuard = latestGuardRow.get(0).getLong(0);
172             lockRef = prevGuard + 1;
173         }
174
175         long lockEpochMillis = System.currentTimeMillis();
176
177         logger.info(EELFLoggerDelegate.applicationLogger,
178                 "Created lock reference for " +  keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
179
180         queryObject = new PreparedQueryObject();
181         String insQuery = "BEGIN BATCH" +
182                 " UPDATE " + keyspace + "." + lockTable +
183                 " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
184                 " INSERT INTO " + keyspace + "." + lockTable +
185                 "(key, lockReference, createTime, acquireTime, writeLock, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
186
187         queryObject.addValue(lockRef);
188         queryObject.addValue(lockName);
189         if (prevGuard != 0)
190             queryObject.addValue(prevGuard);
191
192         queryObject.addValue(lockName);
193         queryObject.addValue(lockRef);
194         queryObject.addValue(String.valueOf(lockEpochMillis));
195         queryObject.addValue("0");
196         queryObject.addValue(locktype==LockType.WRITE ? true : false );
197         queryObject.addValue(owner);
198         queryObject.appendQueryString(insQuery);
199         boolean pResult = dsHandle.executePut(queryObject, "critical");
200         if (!pResult) {// couldn't create lock ref, retry
201             count++;
202             if (count > MusicUtil.getRetryCount()) {
203                 logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to create lock reference");
204                 throw new MusicLockingException("Unable to create lock reference");
205             }
206             return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, count);
207         }
208         return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef);
209     }
210
211         /**
212      * Returns a result set containing the list of clients waiting for a particular lock
213      * 
214      * @param keyspace
215      * @param table
216      * @param key
217      * @return list of lockrefs in the queue
218      * @throws MusicServiceException
219      * @throws MusicQueryException
220      */
221     public List<String> getLockQueue(String keyspace, String table, String key)
222             throws MusicServiceException, MusicQueryException {
223         logger.info(EELFLoggerDelegate.applicationLogger,
224                 "Getting the queue for " + keyspace + "." + table + "." + key);
225         table = table_prepend_name + table;
226         String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "';";
227         PreparedQueryObject queryObject = new PreparedQueryObject();
228         queryObject.appendQueryString(selectQuery);
229         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
230         ArrayList<String> lockQueue = new ArrayList<>();
231         for (Row row : rs) {
232             lockQueue.add(Long.toString(row.getLong("lockReference")));
233         }
234         return lockQueue;
235     }
236
237
238     /**
239      * Returns a result set containing the list of clients waiting for a particular lock
240      * 
241      * @param keyspace
242      * @param table
243      * @param key
244      * @return size of lockrefs queue
245      * @throws MusicServiceException
246      * @throws MusicQueryException
247      */
248     public long getLockQueueSize(String keyspace, String table, String key)
249             throws MusicServiceException, MusicQueryException {
250         logger.info(EELFLoggerDelegate.applicationLogger,
251                 "Getting the queue size for " + keyspace + "." + table + "." + key);
252         table = table_prepend_name + table;
253         String selectQuery = "select count(*) from " + keyspace + "." + table + " where key='" + key + "';";
254         PreparedQueryObject queryObject = new PreparedQueryObject();
255         queryObject.appendQueryString(selectQuery);
256                 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
257                 return rs.one().getLong("count");
258         }
259
260
261     /**
262      * This method returns the top of lock table/queue for the key.
263      * 
264      * @param keyspace of the application.
265      * @param table of the application.
266      * @param key is the primary key of the application table
267      * @return the UUID lock reference. Returns LockObject.isLockOwner=false if there is no owner or the
268      *         lock doesn't exist
269      * @throws MusicServiceException
270      * @throws MusicQueryException
271      */
272     public LockObject peekLockQueue(String keyspace, String table, String key)
273             throws MusicServiceException, MusicQueryException {
274         logger.info(EELFLoggerDelegate.applicationLogger,
275                 "Peek in lock table for " + keyspace + "." + table + "." + key);
276         table = table_prepend_name + table;
277         String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "' LIMIT 1;";
278         PreparedQueryObject queryObject = new PreparedQueryObject();
279         queryObject.appendQueryString(selectQuery);
280         ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
281         Row row = results.one();
282         if (row == null || row.isNull("lockReference")) {
283             return new LockObject(false, null, null, null, null, null);
284         }
285         String lockReference = "" + row.getLong("lockReference");
286         String createTime = row.getString("createTime");
287         String acquireTime = row.getString("acquireTime");
288         LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
289         String owner = row.getString("owner");
290
291         return new LockObject(true, lockReference, createTime, acquireTime, locktype, owner);
292     }
293
294     public List<String> getCurrentLockHolders(String keyspace, String table, String key)
295             throws MusicServiceException, MusicQueryException {
296         logger.info(EELFLoggerDelegate.applicationLogger,
297                 "Getting lockholders in lock table for " + keyspace + "." + table + "." + key);
298         String origTable = table;
299         table = table_prepend_name + table;
300         String selectQuery = "select * from " + keyspace + "." + table + " where key=?;";
301         List<String> lockHolders = new ArrayList<>();
302         PreparedQueryObject queryObject = new PreparedQueryObject();
303         queryObject.appendQueryString(selectQuery);
304         queryObject.addValue(key);
305         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
306         boolean topOfQueue = true;
307         StringBuilder lock = new StringBuilder().
308         append("$").append(keyspace).append(".").append(origTable).
309         append(".").append(key).append("$");
310         StringBuilder lockReference = new StringBuilder();
311         for (Row row : rs) {
312                 if ( row.isNull("lockReference") ) {
313                     return lockHolders;
314                 }
315                 lockReference.append(lock).append(row.getLong("lockReference"));
316             if (row.isNull("writeLock") || row.getBool("writeLock")) {
317                 if (topOfQueue) {
318                     lockHolders.add(lockReference.toString());
319                     break;
320                 } else {
321                     break;
322                 }
323             }
324             // read lock
325             lockHolders.add(lockReference.toString());
326
327             topOfQueue = false;
328             lockReference.delete(0,lockReference.length());
329         }
330         return lockHolders;
331     }
332
333     /**
334      * Determine if the lock is a valid current lock holder.
335      * 
336      * @param keyspace
337      * @param table
338      * @param key
339      * @param lockRef
340      * @return true if lockRef is a lock owner of key
341      * @throws MusicServiceException
342      * @throws MusicQueryException
343      */
344     public boolean isLockOwner(String keyspace, String table, String key, String lockRef)
345             throws MusicServiceException, MusicQueryException {
346         logger.info(EELFLoggerDelegate.applicationLogger,
347                 "Checking in lock table for " + keyspace + "." + table + "." + key);
348         table = table_prepend_name + table;
349         String selectQuery = 
350                 "select * from " + keyspace + "." + table + " where key=?;";
351         PreparedQueryObject queryObject = new PreparedQueryObject();
352         queryObject.appendQueryString(selectQuery);
353         queryObject.addValue(key);
354         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
355
356         boolean topOfQueue = true;
357         for (Row row : rs) {
358             String lockReference = "" + row.getLong("lockReference");
359             if (row.isNull("writeLock") || row.getBool("writeLock")) {
360                 if (topOfQueue && lockRef.equals(lockReference)) {
361                     return true;
362                 } else {
363                     return false;
364                 }
365             }
366             if (lockRef.equals(lockReference)) {
367                 return true;
368             }
369             topOfQueue = false;
370         }
371         logger.info(EELFLoggerDelegate.applicationLogger, "Could not find " + lockRef
372                 + " in the lock queue. It has expired and no longer exists.");
373         return false;
374     }
375
376     /**
377      * Determine if the lock is a valid current lock holder.
378      * 
379      * @param keyspace
380      * @param table
381      * @param key
382      * @param lockRef
383      * @return true if lockRef is a lock owner of key
384      * @throws MusicServiceException
385      * @throws MusicQueryException
386      */
387     public LockObject getLockInfo(String keyspace, String table, String key, String lockRef)
388             throws MusicServiceException, MusicQueryException {
389         logger.info(EELFLoggerDelegate.applicationLogger,
390                 "Checking in lock table for " + keyspace + "." + table + "." + key);
391         String lockQ_table = table_prepend_name + table;
392         String selectQuery = 
393                 "select * from " + keyspace + "." + lockQ_table + " where key=? and lockReference=?;";
394         PreparedQueryObject queryObject = new PreparedQueryObject();
395         queryObject.appendQueryString(selectQuery);
396         queryObject.addValue(key);
397         queryObject.addValue(Long.parseLong(lockRef));
398         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
399         Row row = rs.one();
400         if (row == null || row.isNull("lockReference")) {
401             return null;
402         }
403
404         String lockReference = "" + row.getLong("lockReference");
405         String createTime = row.getString("createTime");
406         String acquireTime = row.getString("acquireTime");
407         LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
408         boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef);
409         String owner = row.getString("owner");
410
411         return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype, owner);
412     }
413
414
415
416     /**
417      * This method removes the lock ref from the lock table/queue for the key.
418      * 
419      * @param keyspace of the application.
420      * @param table of the application.
421      * @param key is the primary key of the application table
422      * @param lockReference the lock reference that needs to be dequeued.
423      * @throws MusicServiceException
424      * @throws MusicQueryException
425      * @throws MusicLockingException
426      */
427     public void deQueueLockRef(String keyspace, String table, String key, String lockReference, int n)
428             throws MusicServiceException, MusicQueryException, MusicLockingException {
429         String prependTable = table_prepend_name + table;
430         PreparedQueryObject queryObject = new PreparedQueryObject();
431         Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf("$") + 1));
432         String deleteQuery = "delete from " + keyspace + "." + prependTable + " where key='" + key
433                 + "' AND lockReference =" + lockReferenceL + " IF EXISTS;";
434         queryObject.appendQueryString(deleteQuery);
435         logger.info(EELFLoggerDelegate.applicationLogger, "Removing lock for key: "+key+ " and reference: "+lockReference);
436         try {
437             dsHandle.executePut(queryObject, "critical");
438             logger.info(EELFLoggerDelegate.applicationLogger,
439                     "Lock removed for key: " + key + " and reference: " + lockReference);
440         } catch (MusicServiceException ex) {
441             logger.error(logger, ex.getMessage(), ex);
442             logger.error(EELFLoggerDelegate.applicationLogger,
443                     "Exception while deQueueLockRef for lockname: " + key + " reference:" + lockReference);
444             if (n > 1) {
445                 logger.info(EELFLoggerDelegate.applicationLogger, "Trying again...");
446                 deQueueLockRef(keyspace, table, key, lockReference, n - 1);
447             } else {
448                 logger.error(EELFLoggerDelegate.applicationLogger,
449                         "deQueueLockRef failed for lockname: " + key + " reference:" + lockReference);
450                 logger.error(logger, ex.getMessage(), ex);
451                 throw new MusicLockingException("Error while deQueueLockRef: " + ex.getMessage());
452             }
453         }
454     }
455
456
457     public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) {
458         table = table_prepend_name + table;
459         PreparedQueryObject queryObject = new PreparedQueryObject();
460         Long lockReferenceL = Long.parseLong(lockReference);
461         String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis()
462                 + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;";
463         queryObject.appendQueryString(updateQuery);
464
465         //cannot use executePut because we need to ignore music timestamp adjustments for lock store
466         dsHandle.getSession().execute(updateQuery);
467     }  
468
469     public boolean checkForDeadlock(String keyspace, String table, String lockName, LockType locktype, String owner, boolean forAcquire) throws MusicServiceException, MusicQueryException {
470         if (locktype.equals(LockType.READ)) return false;
471         if (owner==null || owner.length()==0) return false;
472
473         String lockTable = table_prepend_name + table;
474         PreparedQueryObject queryObject = new PreparedQueryObject();
475         queryObject.appendQueryString("SELECT key, acquiretime, owner FROM " + keyspace + "." + lockTable);
476         queryObject.appendQueryString(" WHERE writelock = True ALLOW FILTERING");
477
478         DeadlockDetectionUtil ddu = new DeadlockDetectionUtil();
479
480         ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject);
481         logger.debug("rs has " + rs.getAvailableWithoutFetching() + (rs.isFullyFetched()?"":" (or more!)") );
482         Iterator<Row> it = rs.iterator();
483         while (it.hasNext()) {
484             Row row = it.next();
485             logger.debug("key = " + row.getString("key") + ", time = " + row.getString("acquiretime") + ", owner = " + row.getString("owner") );
486             ddu.setExisting(row.getString("key"), row.getString("owner"), ("0".equals(row.getString("acquiretime")))?OwnershipType.CREATED:OwnershipType.ACQUIRED);
487         }
488         boolean deadlock = ddu.checkForDeadlock(lockName, owner, forAcquire?OwnershipType.ACQUIRED:OwnershipType.CREATED);
489         if (deadlock) logger.warn("Deadlock detected when " + owner + " tried to create lock on " + keyspace + "." + lockTable + "." + lockName);
490         return deadlock;
491     }
492
493     public List<String> getAllLocksForOwner(String ownerId, String keyspace, String table) throws MusicServiceException, MusicQueryException {
494         List<String> toRet = new ArrayList<String>();
495         String lockTable = table_prepend_name + table;
496         PreparedQueryObject queryObject = new PreparedQueryObject();
497         queryObject.appendQueryString("SELECT key, lockreference FROM " + keyspace + "." + lockTable);
498         queryObject.appendQueryString(" WHERE owner = '" + ownerId + "' ALLOW FILTERING");
499
500         ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject);
501         Iterator<Row> it = rs.iterator();
502         while (it.hasNext()) {
503             Row row = it.next();
504             toRet.add(row.getString("key") + "$" + row.getLong("lockreference"));
505         }
506         return toRet;
507     }
508
509
510 }