3c3f7160fbab7baedb3afb7ceb19ac858679314d
[music.git] / src / main / java / org / onap / music / lockingservice / cassandra / CassaLockStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  *  Modifications Copyright (C) 2019 IBM.
7  * ===================================================================
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *
20  * ============LICENSE_END=============================================
21  * ====================================================================
22  */
23
24 package org.onap.music.lockingservice.cassandra;
25
26 import java.util.ArrayList;
27 import java.util.List;
28
29 import org.onap.music.datastore.MusicDataStore;
30 import org.onap.music.datastore.PreparedQueryObject;
31 import org.onap.music.eelf.logging.EELFLoggerDelegate;
32 import org.onap.music.exceptions.MusicLockingException;
33 import org.onap.music.exceptions.MusicQueryException;
34 import org.onap.music.exceptions.MusicServiceException;
35 import org.onap.music.main.MusicUtil;
36
37 import com.datastax.driver.core.ResultSet;
38 import com.datastax.driver.core.Row;
39
40 /*
41  * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state. 
42  */
43
44 public class CassaLockStore {
45     
46     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
47     private static String table_prepend_name = "lockQ_";
48     private MusicDataStore dsHandle;
49     
50     public CassaLockStore() {
51         dsHandle = new MusicDataStore();
52     }
53     
54     public CassaLockStore(MusicDataStore dsHandle) {
55         this.dsHandle=dsHandle;
56     }
57     public class LockObject{
58         private boolean isLockOwner;
59         private String lockRef;
60         private String createTime;
61         private String acquireTime;
62         private LockType locktype;
63         public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype) {
64             this.setIsLockOwner(isLockOwner);
65             this.setLockRef(lockRef);
66             this.setAcquireTime(acquireTime);
67             this.setCreateTime(createTime);
68             this.setLocktype(locktype);
69         }
70         public boolean getIsLockOwner() {
71             return isLockOwner;
72         }
73         public void setIsLockOwner(boolean isLockOwner) {
74             this.isLockOwner = isLockOwner;
75         }
76         public String getAcquireTime() {
77             return acquireTime;
78         }
79         public void setAcquireTime(String acquireTime) {
80             this.acquireTime = acquireTime;
81         }
82         public String getCreateTime() {
83             return createTime;
84         }
85         public void setCreateTime(String createTime) {
86             this.createTime = createTime;
87         }
88         public String getLockRef() {
89             return lockRef;
90         }
91         public void setLockRef(String lockRef) {
92             this.lockRef = lockRef;
93         }
94         public LockType getLocktype() {
95             return locktype;
96         }
97         public void setLocktype(LockType locktype) {
98             this.locktype = locktype;
99         }
100     }
101     
102     /**
103      * 
104      * This method creates a shadow locking table for every main table in Cassandra. This table tracks all information regarding locks. 
105      * @param keyspace of the application. 
106      * @param table of the application. 
107      * @return true if the operation was successful.
108      * @throws MusicServiceException
109      * @throws MusicQueryException
110      */
111     public boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
112         logger.info(EELFLoggerDelegate.applicationLogger,
113                 "Create lock queue/table for " +  keyspace+"."+table);
114         table = table_prepend_name+table;
115         String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
116                 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, "
117                 + "writeLock boolean, PRIMARY KEY ((key), lockReference) ) "
118                 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
119         PreparedQueryObject queryObject = new PreparedQueryObject();
120         
121         queryObject.appendQueryString(tabQuery);
122         boolean result;
123         result = dsHandle.executePut(queryObject, "eventual");
124         return result;
125     }
126
127     /**
128      * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps.
129      * @param keyspace of the locks.
130      * @param table of the locks.
131      * @param lockName is the primary key of the lock table
132      * @return the UUID lock reference.
133      * @throws MusicServiceException
134      * @throws MusicQueryException
135      */
136     public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype) throws MusicServiceException, MusicQueryException, MusicLockingException {
137         return genLockRefandEnQueue(keyspace, table, lockName, locktype, 0);
138     }
139     
140     private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
141         logger.info(EELFLoggerDelegate.applicationLogger,
142                 "Create " + locktype + " lock reference for " +  keyspace + "." + table + "." + lockName);
143         String lockTable ="";
144         lockTable = table_prepend_name+table;
145     
146
147
148         PreparedQueryObject queryObject = new PreparedQueryObject();
149         String selectQuery = "SELECT guard FROM " + keyspace + "." + lockTable + " WHERE key=?;";
150
151         queryObject.addValue(lockName);
152         queryObject.appendQueryString(selectQuery);
153         ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject);
154         List<Row> latestGuardRow = gqResult.all();
155
156         long prevGuard = 0;
157         long lockRef = 1;
158         if (!latestGuardRow.isEmpty()) {
159             prevGuard = latestGuardRow.get(0).getLong(0);
160             lockRef = prevGuard + 1;
161         }
162
163         long lockEpochMillis = System.currentTimeMillis();
164
165         logger.info(EELFLoggerDelegate.applicationLogger,
166                 "Created lock reference for " +  keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
167
168         queryObject = new PreparedQueryObject();
169         String insQuery = "BEGIN BATCH" +
170                 " UPDATE " + keyspace + "." + lockTable +
171                 " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
172                 " INSERT INTO " + keyspace + "." + lockTable +
173                 "(key, lockReference, createTime, acquireTime, writeLock) VALUES (?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
174
175         queryObject.addValue(lockRef);
176         queryObject.addValue(lockName);
177         if (prevGuard != 0)
178             queryObject.addValue(prevGuard);
179
180         queryObject.addValue(lockName);
181         queryObject.addValue(lockRef);
182         queryObject.addValue(String.valueOf(lockEpochMillis));
183         queryObject.addValue("0");
184         queryObject.addValue(locktype==LockType.WRITE ? true : false );
185         queryObject.appendQueryString(insQuery);
186         boolean pResult = dsHandle.executePut(queryObject, "critical");
187         if (!pResult) {// couldn't create lock ref, retry
188             count++;
189             if (count > MusicUtil.getRetryCount()) {
190                 logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to create lock reference");
191                 throw new MusicLockingException("Unable to create lock reference");
192             }
193             return genLockRefandEnQueue(keyspace, table, lockName, locktype, count);
194         }
195         return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef);
196     }
197     
198     
199
200     /**
201      * Returns a result set containing the list of clients waiting for a particular lock
202      * 
203      * @param keyspace
204      * @param table
205      * @param key
206      * @return list of lockrefs in the queue
207      * @throws MusicServiceException
208      * @throws MusicQueryException
209      */
210     public List<String> getLockQueue(String keyspace, String table, String key)
211             throws MusicServiceException, MusicQueryException {
212         logger.info(EELFLoggerDelegate.applicationLogger,
213                 "Getting the queue for " + keyspace + "." + table + "." + key);
214         table = table_prepend_name + table;
215         String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "';";
216         PreparedQueryObject queryObject = new PreparedQueryObject();
217         queryObject.appendQueryString(selectQuery);
218         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
219         ArrayList<String> lockQueue = new ArrayList<>();
220         for (Row row : rs) {
221             lockQueue.add(Long.toString(row.getLong("lockReference")));
222         }
223         return lockQueue;
224     }
225
226
227     /**
228      * Returns a result set containing the list of clients waiting for a particular lock
229      * 
230      * @param keyspace
231      * @param table
232      * @param key
233      * @return size of lockrefs queue
234      * @throws MusicServiceException
235      * @throws MusicQueryException
236      */
237     public long getLockQueueSize(String keyspace, String table, String key)
238             throws MusicServiceException, MusicQueryException {
239         logger.info(EELFLoggerDelegate.applicationLogger,
240                 "Getting the queue size for " + keyspace + "." + table + "." + key);
241         table = table_prepend_name + table;
242         String selectQuery = "select count(*) from " + keyspace + "." + table + " where key='" + key + "';";
243         PreparedQueryObject queryObject = new PreparedQueryObject();
244         queryObject.appendQueryString(selectQuery);
245                 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
246                 return rs.one().getLong("count");
247         }
248
249
250     /**
251      * This method returns the top of lock table/queue for the key.
252      * 
253      * @param keyspace of the application.
254      * @param table of the application.
255      * @param key is the primary key of the application table
256      * @return the UUID lock reference. Returns LockObject.isLockOwner=false if there is no owner or the
257      *         lock doesn't exist
258      * @throws MusicServiceException
259      * @throws MusicQueryException
260      */
261     public LockObject peekLockQueue(String keyspace, String table, String key)
262             throws MusicServiceException, MusicQueryException {
263         logger.info(EELFLoggerDelegate.applicationLogger,
264                 "Peek in lock table for " + keyspace + "." + table + "." + key);
265         table = table_prepend_name + table;
266         String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "' LIMIT 1;";
267         PreparedQueryObject queryObject = new PreparedQueryObject();
268         queryObject.appendQueryString(selectQuery);
269         ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
270         Row row = results.one();
271         if (row == null || row.isNull("lockReference")) {
272             return new LockObject(false, null, null, null, null);
273         }
274         String lockReference = "" + row.getLong("lockReference");
275         String createTime = row.getString("createTime");
276         String acquireTime = row.getString("acquireTime");
277         LockType locktype = row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
278
279         return new LockObject(true, lockReference, createTime, acquireTime, locktype);
280     }
281
282     public List<String> getCurrentLockHolders(String keyspace, String table, String key)
283             throws MusicServiceException, MusicQueryException {
284         logger.info(EELFLoggerDelegate.applicationLogger,
285                 "Getting lockholders in lock table for " + keyspace + "." + table + "." + key);
286         table = table_prepend_name + table;
287         String selectQuery = "select * from " + keyspace + "." + table + " where key=?;";
288         PreparedQueryObject queryObject = new PreparedQueryObject();
289         queryObject.appendQueryString(selectQuery);
290         queryObject.addValue(key);
291         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
292
293         List<String> lockHolders = new ArrayList<>();
294         boolean topOfQueue = true;
295         for (Row row : rs) {
296             String lockReference = "" + row.getLong("lockReference");
297             if (row.getBool("writeLock")) {
298                 if (topOfQueue) {
299                     lockHolders.add(lockReference);
300                     break;
301                 } else {
302                     break;
303                 }
304             }
305             // read lock
306             lockHolders.add(lockReference);
307
308             topOfQueue = false;
309         }
310         return lockHolders;
311     }
312
313     /**
314      * Determine if the lock is a valid current lock holder.
315      * 
316      * @param keyspace
317      * @param table
318      * @param key
319      * @param lockRef
320      * @return true if lockRef is a lock owner of key
321      * @throws MusicServiceException
322      * @throws MusicQueryException
323      */
324     public boolean isLockOwner(String keyspace, String table, String key, String lockRef)
325             throws MusicServiceException, MusicQueryException {
326         logger.info(EELFLoggerDelegate.applicationLogger,
327                 "Checking in lock table for " + keyspace + "." + table + "." + key);
328         table = table_prepend_name + table;
329         String selectQuery = 
330                 "select * from " + keyspace + "." + table + " where key=?;";
331         PreparedQueryObject queryObject = new PreparedQueryObject();
332         queryObject.appendQueryString(selectQuery);
333         queryObject.addValue(key);
334         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
335
336         boolean topOfQueue = true;
337         for (Row row : rs) {
338                 String lockReference = "" + row.getLong("lockReference");
339             if (row.getBool("writeLock")) {
340                 if (topOfQueue && lockRef.equals(lockReference)) {
341                         return true;
342                 } else {
343                         return false;
344                 }
345             }
346             if (lockRef.equals(lockReference)) {
347                 return true;
348             }
349             topOfQueue = false;
350         }
351         logger.info(EELFLoggerDelegate.applicationLogger, "Could not find " + lockRef
352                 + " in the lock queue. It has expired and no longer exists.");
353         return false;
354     }
355
356     /**
357      * Determine if the lock is a valid current lock holder.
358      * 
359      * @param keyspace
360      * @param table
361      * @param key
362      * @param lockRef
363      * @return true if lockRef is a lock owner of key
364      * @throws MusicServiceException
365      * @throws MusicQueryException
366      */
367     public LockObject getLockInfo(String keyspace, String table, String key, String lockRef)
368             throws MusicServiceException, MusicQueryException {
369         logger.info(EELFLoggerDelegate.applicationLogger,
370                 "Checking in lock table for " + keyspace + "." + table + "." + key);
371         String lockQ_table = table_prepend_name + table;
372         String selectQuery = 
373                 "select * from " + keyspace + "." + lockQ_table + " where key=? and lockReference=?;";
374         PreparedQueryObject queryObject = new PreparedQueryObject();
375         queryObject.appendQueryString(selectQuery);
376         queryObject.addValue(key);
377         queryObject.addValue(Long.parseLong(lockRef));
378         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
379         Row row = rs.one();
380         if (row == null || row.isNull("lockReference")) {
381             return null;
382         }
383
384         String lockReference = "" + row.getLong("lockReference");
385         String createTime = row.getString("createTime");
386         String acquireTime = row.getString("acquireTime");
387         LockType locktype = row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
388         boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef);
389
390         return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype);
391     }
392
393
394
395     /**
396      * This method removes the lock ref from the lock table/queue for the key.
397      * 
398      * @param keyspace of the application.
399      * @param table of the application.
400      * @param key is the primary key of the application table
401      * @param lockReference the lock reference that needs to be dequeued.
402      * @throws MusicServiceException
403      * @throws MusicQueryException
404      * @throws MusicLockingException
405      */
406     public void deQueueLockRef(String keyspace, String table, String key, String lockReference, int n)
407             throws MusicServiceException, MusicQueryException, MusicLockingException {
408         String prependTable = table_prepend_name + table;
409         PreparedQueryObject queryObject = new PreparedQueryObject();
410         Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf("$") + 1));
411         String deleteQuery = "delete from " + keyspace + "." + prependTable + " where key='" + key
412                 + "' AND lockReference =" + lockReferenceL + " IF EXISTS;";
413         queryObject.appendQueryString(deleteQuery);
414         logger.info(EELFLoggerDelegate.applicationLogger, "Removing lock for key: "+key+ " and reference: "+lockReference);
415         try {
416             dsHandle.executePut(queryObject, "critical");
417             logger.info(EELFLoggerDelegate.applicationLogger,
418                     "Lock removed for key: " + key + " and reference: " + lockReference);
419         } catch (MusicServiceException ex) {
420             logger.error(logger, ex.getMessage(), ex);
421             logger.error(EELFLoggerDelegate.applicationLogger,
422                     "Exception while deQueueLockRef for lockname: " + key + " reference:" + lockReference);
423             if (n > 1) {
424                 logger.info(EELFLoggerDelegate.applicationLogger, "Trying again...");
425                 deQueueLockRef(keyspace, table, key, lockReference, n - 1);
426             } else {
427                 logger.error(EELFLoggerDelegate.applicationLogger,
428                         "deQueueLockRef failed for lockname: " + key + " reference:" + lockReference);
429                 logger.error(logger, ex.getMessage(), ex);
430                 throw new MusicLockingException("Error while deQueueLockRef: " + ex.getMessage());
431             }
432         }
433     }
434
435
436     public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference)
437             throws MusicServiceException, MusicQueryException {
438         table = table_prepend_name + table;
439         PreparedQueryObject queryObject = new PreparedQueryObject();
440         Long lockReferenceL = Long.parseLong(lockReference);
441         String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis()
442                 + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;";
443         queryObject.appendQueryString(updateQuery);
444         dsHandle.executePut(queryObject, "eventual");
445
446     }  
447
448 }