06a087ab2b775428cd01902f4bbb84e40e740b23
[music.git] / src / main / java / org / onap / music / lockingservice / cassandra / CassaLockStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  *  Modifications Copyright (C) 2019 IBM.
7  * ===================================================================
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *
20  * ============LICENSE_END=============================================
21  * ====================================================================
22  */
23
24 package org.onap.music.lockingservice.cassandra;
25
26 import java.util.ArrayList;
27 import java.util.List;
28
29 import org.onap.music.datastore.MusicDataStore;
30 import org.onap.music.datastore.PreparedQueryObject;
31 import org.onap.music.eelf.logging.EELFLoggerDelegate;
32 import org.onap.music.exceptions.MusicLockingException;
33 import org.onap.music.exceptions.MusicQueryException;
34 import org.onap.music.exceptions.MusicServiceException;
35 import org.onap.music.main.MusicUtil;
36
37 import com.datastax.driver.core.ResultSet;
38 import com.datastax.driver.core.Row;
39
40 /*
41  * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state. 
42  */
43
44 public class CassaLockStore {
45     
46     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
47     private static String table_prepend_name = "lockQ_";
48     private MusicDataStore dsHandle;
49     
50     public CassaLockStore() {
51         dsHandle = new MusicDataStore();
52     }
53     
54     public CassaLockStore(MusicDataStore dsHandle) {
55         this.dsHandle=dsHandle;
56     }
57     public class LockObject{
58         private boolean isLockOwner;
59         private String lockRef;
60         private String createTime;
61         private String acquireTime;
62         private LockType locktype;
63         public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype) {
64             this.setIsLockOwner(isLockOwner);
65             this.setLockRef(lockRef);
66             this.setAcquireTime(acquireTime);
67             this.setCreateTime(createTime);
68             this.setLocktype(locktype);
69         }
70         public boolean getIsLockOwner() {
71             return isLockOwner;
72         }
73         public void setIsLockOwner(boolean isLockOwner) {
74             this.isLockOwner = isLockOwner;
75         }
76         public String getAcquireTime() {
77             return acquireTime;
78         }
79         public void setAcquireTime(String acquireTime) {
80             this.acquireTime = acquireTime;
81         }
82         public String getCreateTime() {
83             return createTime;
84         }
85         public void setCreateTime(String createTime) {
86             this.createTime = createTime;
87         }
88         public String getLockRef() {
89             return lockRef;
90         }
91         public void setLockRef(String lockRef) {
92             this.lockRef = lockRef;
93         }
94         public LockType getLocktype() {
95             return locktype;
96         }
97         public void setLocktype(LockType locktype) {
98             this.locktype = locktype;
99         }
100     }
101     
102     /**
103      * 
104      * This method creates a shadow locking table for every main table in Cassandra. This table tracks all information regarding locks. 
105      * @param keyspace of the application. 
106      * @param table of the application. 
107      * @return true if the operation was successful.
108      * @throws MusicServiceException
109      * @throws MusicQueryException
110      */
111     public boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
112         logger.info(EELFLoggerDelegate.applicationLogger,
113                 "Create lock queue/table for " +  keyspace+"."+table);
114         table = table_prepend_name+table;
115         String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
116                 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, "
117                 + "writeLock boolean, PRIMARY KEY ((key), lockReference) ) "
118                 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
119         PreparedQueryObject queryObject = new PreparedQueryObject();
120         
121         queryObject.appendQueryString(tabQuery);
122         boolean result;
123         result = dsHandle.executePut(queryObject, "eventual");
124         return result;
125     }
126
127     /**
128      * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps.
129      * @param keyspace of the locks.
130      * @param table of the locks.
131      * @param lockName is the primary key of the lock table
132      * @return the UUID lock reference.
133      * @throws MusicServiceException
134      * @throws MusicQueryException
135      */
136     public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype) throws MusicServiceException, MusicQueryException, MusicLockingException {
137         return genLockRefandEnQueue(keyspace, table, lockName, locktype, 0);
138     }
139     
140     private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
141         logger.info(EELFLoggerDelegate.applicationLogger,
142                 "Create " + locktype + " lock reference for " +  keyspace + "." + table + "." + lockName);
143         String lockTable ="";
144         lockTable = table_prepend_name+table;
145     
146
147
148         PreparedQueryObject queryObject = new PreparedQueryObject();
149         String selectQuery = "SELECT guard FROM " + keyspace + "." + lockTable + " WHERE key=?;";
150
151         queryObject.addValue(lockName);
152         queryObject.appendQueryString(selectQuery);
153         ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject);
154         List<Row> latestGuardRow = gqResult.all();
155
156         long prevGuard = 0;
157         long lockRef = 1;
158         if (!latestGuardRow.isEmpty()) {
159             prevGuard = latestGuardRow.get(0).getLong(0);
160             lockRef = prevGuard + 1;
161         }
162
163         long lockEpochMillis = System.currentTimeMillis();
164
165         logger.info(EELFLoggerDelegate.applicationLogger,
166                 "Created lock reference for " +  keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
167
168         queryObject = new PreparedQueryObject();
169         String insQuery = "BEGIN BATCH" +
170                 " UPDATE " + keyspace + "." + lockTable +
171                 " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
172                 " INSERT INTO " + keyspace + "." + lockTable +
173                 "(key, lockReference, createTime, acquireTime, writeLock) VALUES (?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
174
175         queryObject.addValue(lockRef);
176         queryObject.addValue(lockName);
177         if (prevGuard != 0)
178             queryObject.addValue(prevGuard);
179
180         queryObject.addValue(lockName);
181         queryObject.addValue(lockRef);
182         queryObject.addValue(String.valueOf(lockEpochMillis));
183         queryObject.addValue("0");
184         queryObject.addValue(locktype==LockType.WRITE ? true : false );
185         queryObject.appendQueryString(insQuery);
186         boolean pResult = dsHandle.executePut(queryObject, "critical");
187         if (!pResult) {// couldn't create lock ref, retry
188             count++;
189             if (count > MusicUtil.getRetryCount()) {
190                 logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to create lock reference");
191                 throw new MusicLockingException("Unable to create lock reference");
192             }
193             return genLockRefandEnQueue(keyspace, table, lockName, locktype, count);
194         }
195         return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef);
196     }
197     
198     
199
200     /**
201      * Returns a result set containing the list of clients waiting for a particular lock
202      * 
203      * @param keyspace
204      * @param table
205      * @param key
206      * @return list of lockrefs in the queue
207      * @throws MusicServiceException
208      * @throws MusicQueryException
209      */
210     public List<String> getLockQueue(String keyspace, String table, String key)
211             throws MusicServiceException, MusicQueryException {
212         logger.info(EELFLoggerDelegate.applicationLogger,
213                 "Getting the queue for " + keyspace + "." + table + "." + key);
214         table = table_prepend_name + table;
215         String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "';";
216         PreparedQueryObject queryObject = new PreparedQueryObject();
217         queryObject.appendQueryString(selectQuery);
218         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
219         ArrayList<String> lockQueue = new ArrayList<>();
220         for (Row row : rs) {
221             lockQueue.add(Long.toString(row.getLong("lockReference")));
222         }
223         return lockQueue;
224     }
225
226
227     /**
228      * Returns a result set containing the list of clients waiting for a particular lock
229      * 
230      * @param keyspace
231      * @param table
232      * @param key
233      * @return size of lockrefs queue
234      * @throws MusicServiceException
235      * @throws MusicQueryException
236      */
237     public long getLockQueueSize(String keyspace, String table, String key)
238             throws MusicServiceException, MusicQueryException {
239         logger.info(EELFLoggerDelegate.applicationLogger,
240                 "Getting the queue size for " + keyspace + "." + table + "." + key);
241         table = table_prepend_name + table;
242         String selectQuery = "select count(*) from " + keyspace + "." + table + " where key='" + key + "';";
243         PreparedQueryObject queryObject = new PreparedQueryObject();
244         queryObject.appendQueryString(selectQuery);
245                 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
246                 return rs.one().getLong("count");
247         }
248
249
250     /**
251      * This method returns the top of lock table/queue for the key.
252      * 
253      * @param keyspace of the application.
254      * @param table of the application.
255      * @param key is the primary key of the application table
256      * @return the UUID lock reference. Returns LockObject.isLockOwner=false if there is no owner or the
257      *         lock doesn't exist
258      * @throws MusicServiceException
259      * @throws MusicQueryException
260      */
261     public LockObject peekLockQueue(String keyspace, String table, String key)
262             throws MusicServiceException, MusicQueryException {
263         logger.info(EELFLoggerDelegate.applicationLogger,
264                 "Peek in lock table for " + keyspace + "." + table + "." + key);
265         table = table_prepend_name + table;
266         String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "' LIMIT 1;";
267         PreparedQueryObject queryObject = new PreparedQueryObject();
268         queryObject.appendQueryString(selectQuery);
269         ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
270         Row row = results.one();
271         if (row == null || row.isNull("lockReference")) {
272             return new LockObject(false, null, null, null, null);
273         }
274         String lockReference = "" + row.getLong("lockReference");
275         String createTime = row.getString("createTime");
276         String acquireTime = row.getString("acquireTime");
277         LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
278
279         return new LockObject(true, lockReference, createTime, acquireTime, locktype);
280     }
281
282     public List<String> getCurrentLockHolders(String keyspace, String table, String key)
283             throws MusicServiceException, MusicQueryException {
284         logger.info(EELFLoggerDelegate.applicationLogger,
285                 "Getting lockholders in lock table for " + keyspace + "." + table + "." + key);
286         String origTable = table;
287         table = table_prepend_name + table;
288         String selectQuery = "select * from " + keyspace + "." + table + " where key=?;";
289         List<String> lockHolders = new ArrayList<>();
290         PreparedQueryObject queryObject = new PreparedQueryObject();
291         queryObject.appendQueryString(selectQuery);
292         queryObject.addValue(key);
293         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
294         boolean topOfQueue = true;
295         StringBuilder lock = new StringBuilder().
296         append("$").append(keyspace).append(".").append(origTable).
297         append(".").append(key).append("$");
298         StringBuilder lockReference = new StringBuilder();
299         for (Row row : rs) {
300                 if ( row.isNull("lockReference") ) {
301                     return lockHolders;
302                 }
303                 lockReference.append(lock).append(row.getLong("lockReference"));
304             if (row.isNull("writeLock") || row.getBool("writeLock")) {
305                 if (topOfQueue) {
306                     lockHolders.add(lockReference.toString());
307                     break;
308                 } else {
309                     break;
310                 }
311             }
312             // read lock
313             lockHolders.add(lockReference.toString());
314
315             topOfQueue = false;
316             lockReference.delete(0,lockReference.length());
317         }
318         return lockHolders;
319     }
320
321     /**
322      * Determine if the lock is a valid current lock holder.
323      * 
324      * @param keyspace
325      * @param table
326      * @param key
327      * @param lockRef
328      * @return true if lockRef is a lock owner of key
329      * @throws MusicServiceException
330      * @throws MusicQueryException
331      */
332     public boolean isLockOwner(String keyspace, String table, String key, String lockRef)
333             throws MusicServiceException, MusicQueryException {
334         logger.info(EELFLoggerDelegate.applicationLogger,
335                 "Checking in lock table for " + keyspace + "." + table + "." + key);
336         table = table_prepend_name + table;
337         String selectQuery = 
338                 "select * from " + keyspace + "." + table + " where key=?;";
339         PreparedQueryObject queryObject = new PreparedQueryObject();
340         queryObject.appendQueryString(selectQuery);
341         queryObject.addValue(key);
342         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
343
344         boolean topOfQueue = true;
345         for (Row row : rs) {
346             String lockReference = "" + row.getLong("lockReference");
347             if (row.isNull("writeLock") || row.getBool("writeLock")) {
348                 if (topOfQueue && lockRef.equals(lockReference)) {
349                     return true;
350                 } else {
351                     return false;
352                 }
353             }
354             if (lockRef.equals(lockReference)) {
355                 return true;
356             }
357             topOfQueue = false;
358         }
359         logger.info(EELFLoggerDelegate.applicationLogger, "Could not find " + lockRef
360                 + " in the lock queue. It has expired and no longer exists.");
361         return false;
362     }
363
364     /**
365      * Determine if the lock is a valid current lock holder.
366      * 
367      * @param keyspace
368      * @param table
369      * @param key
370      * @param lockRef
371      * @return true if lockRef is a lock owner of key
372      * @throws MusicServiceException
373      * @throws MusicQueryException
374      */
375     public LockObject getLockInfo(String keyspace, String table, String key, String lockRef)
376             throws MusicServiceException, MusicQueryException {
377         logger.info(EELFLoggerDelegate.applicationLogger,
378                 "Checking in lock table for " + keyspace + "." + table + "." + key);
379         String lockQ_table = table_prepend_name + table;
380         String selectQuery = 
381                 "select * from " + keyspace + "." + lockQ_table + " where key=? and lockReference=?;";
382         PreparedQueryObject queryObject = new PreparedQueryObject();
383         queryObject.appendQueryString(selectQuery);
384         queryObject.addValue(key);
385         queryObject.addValue(Long.parseLong(lockRef));
386         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
387         Row row = rs.one();
388         if (row == null || row.isNull("lockReference")) {
389             return null;
390         }
391
392         String lockReference = "" + row.getLong("lockReference");
393         String createTime = row.getString("createTime");
394         String acquireTime = row.getString("acquireTime");
395         LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
396         boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef);
397
398         return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype);
399     }
400
401
402
403     /**
404      * This method removes the lock ref from the lock table/queue for the key.
405      * 
406      * @param keyspace of the application.
407      * @param table of the application.
408      * @param key is the primary key of the application table
409      * @param lockReference the lock reference that needs to be dequeued.
410      * @throws MusicServiceException
411      * @throws MusicQueryException
412      * @throws MusicLockingException
413      */
414     public void deQueueLockRef(String keyspace, String table, String key, String lockReference, int n)
415             throws MusicServiceException, MusicQueryException, MusicLockingException {
416         String prependTable = table_prepend_name + table;
417         PreparedQueryObject queryObject = new PreparedQueryObject();
418         Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf("$") + 1));
419         String deleteQuery = "delete from " + keyspace + "." + prependTable + " where key='" + key
420                 + "' AND lockReference =" + lockReferenceL + " IF EXISTS;";
421         queryObject.appendQueryString(deleteQuery);
422         logger.info(EELFLoggerDelegate.applicationLogger, "Removing lock for key: "+key+ " and reference: "+lockReference);
423         try {
424             dsHandle.executePut(queryObject, "critical");
425             logger.info(EELFLoggerDelegate.applicationLogger,
426                     "Lock removed for key: " + key + " and reference: " + lockReference);
427         } catch (MusicServiceException ex) {
428             logger.error(logger, ex.getMessage(), ex);
429             logger.error(EELFLoggerDelegate.applicationLogger,
430                     "Exception while deQueueLockRef for lockname: " + key + " reference:" + lockReference);
431             if (n > 1) {
432                 logger.info(EELFLoggerDelegate.applicationLogger, "Trying again...");
433                 deQueueLockRef(keyspace, table, key, lockReference, n - 1);
434             } else {
435                 logger.error(EELFLoggerDelegate.applicationLogger,
436                         "deQueueLockRef failed for lockname: " + key + " reference:" + lockReference);
437                 logger.error(logger, ex.getMessage(), ex);
438                 throw new MusicLockingException("Error while deQueueLockRef: " + ex.getMessage());
439             }
440         }
441     }
442
443
444     public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference)
445             throws MusicServiceException, MusicQueryException {
446         table = table_prepend_name + table;
447         PreparedQueryObject queryObject = new PreparedQueryObject();
448         Long lockReferenceL = Long.parseLong(lockReference);
449         String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis()
450                 + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;";
451         queryObject.appendQueryString(updateQuery);
452
453         //cannot use executePut because we need to ignore music timestamp adjustments for lock store
454         dsHandle.getSession().execute(updateQuery);
455     }  
456
457 }