Merge "Update junits"
[music.git] / music-core / src / main / java / org / onap / music / lockingservice / cassandra / CassaLockStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  *  Modifications Copyright (C) 2019 IBM.
7  * ===================================================================
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *
20  * ============LICENSE_END=============================================
21  * ====================================================================
22  */
23
24 package org.onap.music.lockingservice.cassandra;
25
26 import java.util.ArrayList;
27 import java.util.Iterator;
28 import java.util.List;
29
30 import org.onap.music.datastore.MusicDataStore;
31 import org.onap.music.datastore.PreparedQueryObject;
32 import org.onap.music.eelf.logging.EELFLoggerDelegate;
33 import org.onap.music.exceptions.MusicLockingException;
34 import org.onap.music.exceptions.MusicQueryException;
35 import org.onap.music.exceptions.MusicServiceException;
36 import org.onap.music.main.DeadlockDetectionUtil;
37 import org.onap.music.main.DeadlockDetectionUtil.OwnershipType;
38 import org.onap.music.main.MusicUtil;
39 import org.onap.music.main.ResultType;
40 import org.onap.music.main.ReturnType;
41 import com.datastax.driver.core.ResultSet;
42 import com.datastax.driver.core.Row;
43 import com.datastax.driver.core.Session;
44 import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
45
46 /*
47  * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state. 
48  */
49
50 public class CassaLockStore {
51     
52     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
53     private static String table_prepend_name = "lockQ_";
54     private MusicDataStore dsHandle;
55     
56     public CassaLockStore() {
57         dsHandle = new MusicDataStore();
58     }
59     
60     public CassaLockStore(MusicDataStore dsHandle) {
61         this.dsHandle=dsHandle;
62     }
63     public class LockObject{
64         private boolean isLockOwner;
65         private String lockRef;
66         private String createTime;
67         private String acquireTime;
68         private LockType locktype;
69         // Owner is the self-declared client which "owns" this row. It is used for deadlock detection.  It is not (directly) related to isLockOwner.
70         private String owner;
71         public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype, String owner) {
72             this.setIsLockOwner(isLockOwner);
73             this.setLockRef(lockRef);
74             this.setAcquireTime(acquireTime);
75             this.setCreateTime(createTime);
76             this.setLocktype(locktype);
77             this.setOwner(owner);
78         }
79         public boolean getIsLockOwner() {
80             return isLockOwner;
81         }
82         public void setIsLockOwner(boolean isLockOwner) {
83             this.isLockOwner = isLockOwner;
84         }
85         public String getAcquireTime() {
86             return acquireTime;
87         }
88         public void setAcquireTime(String acquireTime) {
89             this.acquireTime = acquireTime;
90         }
91         public String getCreateTime() {
92             return createTime;
93         }
94         public void setCreateTime(String createTime) {
95             this.createTime = createTime;
96         }
97         public String getLockRef() {
98             return lockRef;
99         }
100         public void setLockRef(String lockRef) {
101             this.lockRef = lockRef;
102         }
103         public LockType getLocktype() {
104             return locktype;
105         }
106         public void setLocktype(LockType locktype) {
107             this.locktype = locktype;
108         }
109         public String getOwner() {
110             return owner;
111         }
112         public void setOwner(String owner) {
113             this.owner = owner;
114         }
115     }
116     
117     /**
118      * 
119      * This method creates a shadow locking table for every main table in Cassandra. This table tracks all information regarding locks. 
120      * @param keyspace of the application. 
121      * @param table of the application. 
122      * @return true if the operation was successful.
123      * @throws MusicServiceException
124      * @throws MusicQueryException
125      */
126     public boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
127         logger.info(EELFLoggerDelegate.applicationLogger,
128                 "Create lock queue/table for " +  keyspace+"."+table);
129         table = table_prepend_name+table;
130         String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
131                 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, "
132                 + "lockType text, leasePeriodTime bigint, owner text, PRIMARY KEY ((key), lockReference) ) "
133                 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
134         PreparedQueryObject queryObject = new PreparedQueryObject();
135         
136         queryObject.appendQueryString(tabQuery);
137         boolean result;
138         result = dsHandle.executePut(queryObject, "eventual");
139         return result;
140     }
141
142     /**
143      * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps.
144      * @param keyspace of the locks.
145      * @param table of the locks.
146      * @param lockName is the primary key of the lock table
147      * @param lockType is the type of lock (read/write)
148      * @param owner is the owner of the lock (optional, for deadlock detection)
149      * @return the UUID lock reference.
150      * @throws MusicServiceException
151      * @throws MusicQueryException
152      */
153     public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner) throws MusicServiceException, MusicQueryException, MusicLockingException {
154         return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, 0);
155     }
156     
157     private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
158         logger.info(EELFLoggerDelegate.applicationLogger,
159                 "Create " + locktype + " lock reference for " +  keyspace + "." + table + "." + lockName);
160         String lockTable ="";
161         lockTable = table_prepend_name+table;
162     
163         PreparedQueryObject queryObject = new PreparedQueryObject();
164         String selectQuery = "SELECT guard FROM " + keyspace + "." + lockTable + " WHERE key=?;";
165
166         queryObject.addValue(lockName);
167         queryObject.appendQueryString(selectQuery);
168         ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject);
169         List<Row> latestGuardRow = gqResult.all();
170
171         long prevGuard = 0;
172         long lockRef = 1;
173         if (!latestGuardRow.isEmpty()) {
174             prevGuard = latestGuardRow.get(0).getLong(0);
175             lockRef = prevGuard + 1;
176         }
177
178         long lockEpochMillis = System.currentTimeMillis();
179
180         logger.info(EELFLoggerDelegate.applicationLogger,
181                 "Created lock reference for " +  keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
182         
183         queryObject = new PreparedQueryObject();
184         
185         String insQuery = "BEGIN BATCH" +
186                 " UPDATE " + keyspace + "." + lockTable +
187                 " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
188                 " INSERT INTO " + keyspace + "." + lockTable +
189                 "(key, lockReference, createTime, acquireTime, lockType, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
190
191         queryObject.addValue(lockRef);
192         queryObject.addValue(lockName);
193         if (prevGuard != 0)
194             queryObject.addValue(prevGuard);
195
196         queryObject.addValue(lockName);
197         queryObject.addValue(lockRef);
198         queryObject.addValue(String.valueOf(lockEpochMillis));
199         queryObject.addValue("0");
200         queryObject.addValue(locktype);
201         queryObject.addValue(owner);
202         queryObject.appendQueryString(insQuery);
203         boolean pResult = dsHandle.executePut(queryObject, "critical");
204         if (!pResult) {// couldn't create lock ref, retry
205             count++;
206             if (count > MusicUtil.getRetryCount()) {
207                 logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to create lock reference");
208                 throw new MusicLockingException("Unable to create lock reference");
209             }
210             return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, count);
211         }
212         return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef);
213     }
214
215         /**
216      * Returns a result set containing the list of clients waiting for a particular lock
217      * 
218      * @param keyspace
219      * @param table
220      * @param key
221      * @return list of lockrefs in the queue
222      * @throws MusicServiceException
223      * @throws MusicQueryException
224      */
225     public List<String> getLockQueue(String keyspace, String table, String key)
226             throws MusicServiceException, MusicQueryException {
227         logger.info(EELFLoggerDelegate.applicationLogger,
228                 "Getting the queue for " + keyspace + "." + table + "." + key);
229         table = table_prepend_name + table;
230         String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "';";
231         PreparedQueryObject queryObject = new PreparedQueryObject();
232         queryObject.appendQueryString(selectQuery);
233         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
234         ArrayList<String> lockQueue = new ArrayList<>();
235         for (Row row : rs) {
236             lockQueue.add(Long.toString(row.getLong("lockReference")));
237         }
238         return lockQueue;
239     }
240
241
242     /**
243      * Returns a result set containing the list of clients waiting for a particular lock
244      * 
245      * @param keyspace
246      * @param table
247      * @param key
248      * @return size of lockrefs queue
249      * @throws MusicServiceException
250      * @throws MusicQueryException
251      */
252     public long getLockQueueSize(String keyspace, String table, String key)
253             throws MusicServiceException, MusicQueryException {
254         logger.info(EELFLoggerDelegate.applicationLogger,
255                 "Getting the queue size for " + keyspace + "." + table + "." + key);
256         table = table_prepend_name + table;
257         String selectQuery = "select count(*) from " + keyspace + "." + table + " where key='" + key + "';";
258         PreparedQueryObject queryObject = new PreparedQueryObject();
259         queryObject.appendQueryString(selectQuery);
260                 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
261                 return rs.one().getLong("count");
262         }
263
264
265     /**
266      * This method returns the top of lock table/queue for the key.
267      * 
268      * @param keyspace of the application.
269      * @param table of the application.
270      * @param key is the primary key of the application table
271      * @return the UUID lock reference. Returns LockObject.isLockOwner=false if there is no owner or the
272      *         lock doesn't exist
273      * @throws MusicServiceException
274      * @throws MusicQueryException
275      */
276     public LockObject peekLockQueue(String keyspace, String table, String key)
277             throws MusicServiceException, MusicQueryException {
278         logger.info(EELFLoggerDelegate.applicationLogger,
279                 "Peek in lock table for " + keyspace + "." + table + "." + key);
280         table = table_prepend_name + table;
281         String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "' LIMIT 1;";
282         PreparedQueryObject queryObject = new PreparedQueryObject();
283         queryObject.appendQueryString(selectQuery);
284         ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
285         Row row = results.one();
286         if (row == null || row.isNull("lockReference")) {
287             return new LockObject(false, null, null, null, null, null);
288         }
289         String lockReference = "" + row.getLong("lockReference");
290         String createTime = row.getString("createTime");
291         String acquireTime = row.getString("acquireTime");
292         LockType locktype = row.get("lockType", LockType.class);
293         String owner = row.getString("owner");
294
295         return new LockObject(true, lockReference, createTime, acquireTime, locktype, owner);
296     }
297
298     public List<String> getCurrentLockHolders(String keyspace, String table, String key)
299             throws MusicServiceException, MusicQueryException {
300         logger.info(EELFLoggerDelegate.applicationLogger,
301                 "Getting lockholders in lock table for " + keyspace + "." + table + "." + key);
302         String origTable = table;
303         table = table_prepend_name + table;
304         String selectQuery = "select * from " + keyspace + "." + table + " where key=?;";
305         List<String> lockHolders = new ArrayList<>();
306         PreparedQueryObject queryObject = new PreparedQueryObject();
307         queryObject.appendQueryString(selectQuery);
308         queryObject.addValue(key);
309         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
310         boolean topOfQueue = true;
311         StringBuilder lock = new StringBuilder().
312         append("$").append(keyspace).append(".").append(origTable).
313         append(".").append(key).append("$");
314         StringBuilder lockReference = new StringBuilder();
315         for (Row row : rs) {
316                 if ( row.isNull("lockReference") ) {
317                     return lockHolders;
318                 }
319                 lockReference.append(lock).append(row.getLong("lockReference"));
320             if (row.get("lockType", LockType.class)!=LockType.WRITE) {
321                 if (topOfQueue) {
322                     lockHolders.add(lockReference.toString());
323                     break;
324                 } else {
325                     break;
326                 }
327             }
328             // read lock
329             lockHolders.add(lockReference.toString());
330
331             topOfQueue = false;
332             lockReference.delete(0,lockReference.length());
333         }
334         return lockHolders;
335     }
336
337     /**
338      * Determine if the lock is a valid current lock holder.
339      * 
340      * @param keyspace
341      * @param table
342      * @param key
343      * @param lockRef
344      * @return true if lockRef is a lock owner of key
345      * @throws MusicServiceException
346      * @throws MusicQueryException
347      */
348     public boolean isLockOwner(String keyspace, String table, String key, String lockRef)
349             throws MusicServiceException, MusicQueryException {
350         logger.info(EELFLoggerDelegate.applicationLogger,
351                 "Checking in lock table for " + keyspace + "." + table + "." + key);
352         table = table_prepend_name + table;
353         String selectQuery = 
354                 "select * from " + keyspace + "." + table + " where key=?;";
355         PreparedQueryObject queryObject = new PreparedQueryObject();
356         queryObject.appendQueryString(selectQuery);
357         queryObject.addValue(key);
358         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
359
360         boolean topOfQueue = true;
361         for (Row row : rs) {
362             String lockReference = "" + row.getLong("lockReference");
363             if (row.get("lockType", LockType.class)==LockType.WRITE) {
364                 if (topOfQueue && lockRef.equals(lockReference)) {
365                     return true;
366                 } else {
367                     return false;
368                 }
369             }
370             if (lockRef.equals(lockReference)) {
371                 return true;
372             }
373             topOfQueue = false;
374         }
375         logger.info(EELFLoggerDelegate.applicationLogger, "Could not find " + lockRef
376                 + " in the lock queue. It has expired and no longer exists.");
377         return false;
378     }
379
380     /**
381      * Determine if the lock is a valid current lock holder.
382      * 
383      * @param keyspace
384      * @param table
385      * @param key
386      * @param lockRef
387      * @return true if lockRef is a lock owner of key
388      * @throws MusicServiceException
389      * @throws MusicQueryException
390      */
391     public LockObject getLockInfo(String keyspace, String table, String key, String lockRef)
392             throws MusicServiceException, MusicQueryException {
393         logger.info(EELFLoggerDelegate.applicationLogger,
394                 "Checking in lock table for " + keyspace + "." + table + "." + key);
395         String lockQ_table = table_prepend_name + table;
396         String selectQuery = 
397                 "select * from " + keyspace + "." + lockQ_table + " where key=? and lockReference=?;";
398         PreparedQueryObject queryObject = new PreparedQueryObject();
399         queryObject.appendQueryString(selectQuery);
400         queryObject.addValue(key);
401         queryObject.addValue(Long.parseLong(lockRef));
402         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
403         Row row = rs.one();
404         if (row == null || row.isNull("lockReference")) {
405             return null;
406         }
407
408         String lockReference = "" + row.getLong("lockReference");
409         String createTime = row.getString("createTime");
410         String acquireTime = row.getString("acquireTime");
411         LockType locktype = row.get("lockType", LockType.class);
412         boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef);
413         String owner = row.getString("owner");
414
415         return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype, owner);
416     }
417
418
419
420     /**
421      * This method removes the lock ref from the lock table/queue for the key.
422      * 
423      * @param keyspace of the application.
424      * @param table of the application.
425      * @param key is the primary key of the application table
426      * @param lockReference the lock reference that needs to be dequeued.
427      * @throws MusicServiceException
428      * @throws MusicQueryException
429      * @throws MusicLockingException
430      */
431     public void deQueueLockRef(String keyspace, String table, String key, String lockReference, int n)
432             throws MusicServiceException, MusicQueryException, MusicLockingException {
433         String prependTable = table_prepend_name + table;
434         PreparedQueryObject queryObject = new PreparedQueryObject();
435         Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf("$") + 1));
436         String deleteQuery = "delete from " + keyspace + "." + prependTable + " where key='" + key
437                 + "' AND lockReference =" + lockReferenceL + " IF EXISTS;";
438         queryObject.appendQueryString(deleteQuery);
439         logger.info(EELFLoggerDelegate.applicationLogger, "Removing lock for key: "+key+ " and reference: "+lockReference);
440         try {
441             dsHandle.executePut(queryObject, "critical");
442             logger.info(EELFLoggerDelegate.applicationLogger,
443                     "Lock removed for key: " + key + " and reference: " + lockReference);
444         } catch (MusicServiceException ex) {
445             logger.error(logger, ex.getMessage(), ex);
446             logger.error(EELFLoggerDelegate.applicationLogger,
447                     "Exception while deQueueLockRef for lockname: " + key + " reference:" + lockReference);
448             if (n > 1) {
449                 logger.info(EELFLoggerDelegate.applicationLogger, "Trying again...");
450                 deQueueLockRef(keyspace, table, key, lockReference, n - 1);
451             } else {
452                 logger.error(EELFLoggerDelegate.applicationLogger,
453                         "deQueueLockRef failed for lockname: " + key + " reference:" + lockReference);
454                 logger.error(logger, ex.getMessage(), ex);
455                 throw new MusicLockingException("Error while deQueueLockRef: " + ex.getMessage());
456             }
457         }
458     }
459
460
461     public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) {
462         table = table_prepend_name + table;
463         Long lockReferenceL = Long.parseLong(lockReference);
464         String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis()
465                 + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;";
466
467         //cannot use executePut because we need to ignore music timestamp adjustments for lock store
468         dsHandle.getSession().execute(updateQuery);
469     }  
470
471     public boolean checkForDeadlock(String keyspace, String table, String lockName, LockType locktype, String owner, boolean forAcquire) throws MusicServiceException, MusicQueryException {
472         if (locktype.equals(LockType.READ)) return false;
473         if (owner==null || owner.length()==0) return false;
474
475         String lockTable = table_prepend_name + table;
476         PreparedQueryObject queryObject = new PreparedQueryObject();
477         queryObject.appendQueryString("SELECT key, acquiretime, owner FROM " + keyspace + "." + lockTable);
478         queryObject.appendQueryString(" WHERE lockType = ? ALLOW FILTERING");
479         queryObject.addValue(LockType.WRITE);
480
481         DeadlockDetectionUtil ddu = getDeadlockDetectionUtil();
482
483         ResultSet rs = dsHandle.executeLocalQuorumConsistencyGet(queryObject);
484         logger.debug("rs has " + rs.getAvailableWithoutFetching() + (rs.isFullyFetched()?"":" (or more!)") );
485         Iterator<Row> it = rs.iterator();
486         while (it.hasNext()) {
487             Row row = it.next();
488             logger.debug("key = " + row.getString("key") + ", time = " + row.getString("acquiretime") + ", owner = " + row.getString("owner") );
489             ddu.setExisting(row.getString("key"), row.getString("owner"), ("0".equals(row.getString("acquiretime")))?OwnershipType.CREATED:OwnershipType.ACQUIRED);
490         }
491         boolean deadlock = ddu.checkForDeadlock(lockName, owner, forAcquire?OwnershipType.ACQUIRED:OwnershipType.CREATED);
492         if (deadlock) logger.warn("Deadlock detected when " + owner + " tried to create lock on " + keyspace + "." + lockTable + "." + lockName);
493         return deadlock;
494     }
495
496     /**
497      * This is used for testing purpose
498      * @return new DeadlockDetectionUtil object
499      */
500     DeadlockDetectionUtil getDeadlockDetectionUtil() {
501         return new DeadlockDetectionUtil();
502     }
503     
504     public List<String> getAllLocksForOwner(String ownerId, String keyspace, String table) throws MusicServiceException, MusicQueryException {
505         List<String> toRet = new ArrayList<String>();
506         String lockTable = table_prepend_name + table;
507         PreparedQueryObject queryObject = new PreparedQueryObject();
508         queryObject.appendQueryString("SELECT key, lockreference FROM " + keyspace + "." + lockTable);
509         queryObject.appendQueryString(" WHERE owner = '" + ownerId + "' ALLOW FILTERING");
510
511         ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject);
512         Iterator<Row> it = rs.iterator();
513         while (it.hasNext()) {
514             Row row = it.next();
515             toRet.add(row.getString("key") + "$" + row.getLong("lockreference"));
516         }
517         return toRet;
518     }
519
520     public ReturnType promoteLock(String keyspace, String table, String key, String lockRef)
521             throws MusicLockingException, MusicServiceException, MusicQueryException {
522         String lockqtable = table_prepend_name + table;
523         String selectQuery = "select * from " + keyspace + "." + lockqtable + " where key=?;";
524
525         PreparedQueryObject queryObject = new PreparedQueryObject();
526         queryObject.appendQueryString(selectQuery);
527         queryObject.addValue(key);
528         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
529         
530         long refToPromote = Long.parseLong(lockRef);
531
532         boolean topOfQueue = true;
533         boolean readBlock = false;
534         boolean seenLockToPromote = false;
535         boolean promotionOngoing = false;
536         long readBlockStart = 0;
537         long readBlockEnd = 0;
538
539
540         for (Row row : rs) {
541             long ref = row.getLong("lockreference");
542             LockType lockType = row.get("lockType", LockType.class);
543             
544             if (refToPromote==ref) {
545                 if (promotionOngoing) {
546                     return new ReturnType(ResultType.FAILURE, "Can't promote, already promoting another lockref.");
547                 }
548                 seenLockToPromote = true;
549                 if (!topOfQueue) {
550                     readBlockStart = ref;
551                     readBlockEnd = ref;
552                     break;
553                 }
554             } else if (!seenLockToPromote && refToPromote<ref) {
555                 return new ReturnType(ResultType.FAILURE, "Lockref does not exist.");
556             }
557             
558             if (lockType==LockType.READ || lockType==LockType.PROMOTING) {
559                 if (!readBlock) {
560                     readBlockStart = ref;
561                     readBlock = true;
562                 }
563                 if (readBlock) {
564                     readBlockEnd = ref;
565                 }
566                 if (lockType==LockType.PROMOTING) {
567                     promotionOngoing = true;
568                 }
569             }
570             
571             if (lockType==LockType.WRITE) {
572                 if (refToPromote==ref) {
573                     return new ReturnType(ResultType.FAILURE, "Lockref is already write.");
574                 }
575                 if (readBlock) {
576                     readBlock = false;
577                     promotionOngoing = false;
578                     if (seenLockToPromote) {
579                         break;
580                     }
581                     //can no longer be lock holder after this
582                     topOfQueue = false;
583                 }
584             }
585         }
586
587         if (readBlockStart<=refToPromote && refToPromote<=readBlockEnd) {
588             if (readBlockStart==refToPromote && refToPromote==readBlockEnd) {
589                 promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.WRITE);
590                 return new ReturnType(ResultType.SUCCESS, "Lock has successfully been upgraded.");
591             }
592             promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.PROMOTING);
593             return new ReturnType(ResultType.FAILURE, "Your lock upgrade is in progress. Check again to see if successful."); 
594         }
595         
596         //shouldn't reach here?
597         return new ReturnType(ResultType.FAILURE,"Promotion failed.");
598     }
599
600     private void promoteLockTo(String keyspace, String table, String key, String lockRef, LockType newLockType)
601             throws MusicServiceException, MusicQueryException {
602         PreparedQueryObject queryObject =
603                 new PreparedQueryObject("UPDATE " + keyspace + "." + table + " SET lockType=? WHERE key='" + key
604                         + "' AND lockReference = " + lockRef + " IF EXISTS;", newLockType);
605
606         //cannot use executePut because we need to ignore music timestamp adjustments for lock store
607         dsHandle.executePut(queryObject, MusicUtil.QUORUM);
608     }
609     
610
611
612 }