Changes Listed below:
[music.git] / src / main / java / org / onap / music / lockingservice / cassandra / CassaLockStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  *  Modifications Copyright (C) 2019 IBM.
7  * ===================================================================
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *
20  * ============LICENSE_END=============================================
21  * ====================================================================
22  */
23
24 package org.onap.music.lockingservice.cassandra;
25
26 import java.util.ArrayList;
27 import java.util.List;
28
29 import org.onap.music.datastore.MusicDataStore;
30 import org.onap.music.datastore.PreparedQueryObject;
31 import org.onap.music.eelf.logging.EELFLoggerDelegate;
32 import org.onap.music.exceptions.MusicLockingException;
33 import org.onap.music.exceptions.MusicQueryException;
34 import org.onap.music.exceptions.MusicServiceException;
35 import org.onap.music.main.MusicUtil;
36
37 import com.datastax.driver.core.ResultSet;
38 import com.datastax.driver.core.Row;
39
40 /*
41  * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state. 
42  */
43
44 public class CassaLockStore {
45     
46     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
47     private static String table_prepend_name = "lockQ_";
48     
49     public class LockObject{
50         public String lockRef;
51         public String createTime;
52         public String acquireTime;
53         public LockObject(String lockRef, String createTime,     String acquireTime) {
54             this.lockRef = lockRef;
55             this.acquireTime = acquireTime;
56             this.createTime = createTime;
57             
58         }
59     }
60     MusicDataStore dsHandle;
61     public CassaLockStore() {
62         dsHandle = new MusicDataStore();
63     }
64     
65     public CassaLockStore(MusicDataStore dsHandle) {
66         this.dsHandle=dsHandle;
67     }
68
69     
70     /**
71      * 
72      * This method creates a shadow locking table for every main table in Cassandra. This table tracks all information regarding locks. 
73      * @param keyspace of the application. 
74      * @param table of the application. 
75      * @return true if the operation was successful.
76      * @throws MusicServiceException
77      * @throws MusicQueryException
78      */
79     public boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
80         logger.info(EELFLoggerDelegate.applicationLogger,
81                 "Create lock queue/table for " +  keyspace+"."+table);
82         table = table_prepend_name+table;
83         String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
84                 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, PRIMARY KEY ((key), lockReference) ) "
85                 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
86         PreparedQueryObject queryObject = new PreparedQueryObject();
87         
88         queryObject.appendQueryString(tabQuery);
89         boolean result;
90         result = dsHandle.executePut(queryObject, "eventual");
91         return result;
92     }
93
94     /**
95      * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps.
96      * @param keyspace of the locks.
97      * @param table of the locks.
98      * @param lockName is the primary key of the lock table
99      * @return the UUID lock reference.
100      * @throws MusicServiceException
101      * @throws MusicQueryException
102      */
103     public String genLockRefandEnQueue(String keyspace, String table, String lockName) throws MusicServiceException, MusicQueryException, MusicLockingException {
104         return genLockRefandEnQueue(keyspace, table, lockName, 0);
105     }
106     
107     private String genLockRefandEnQueue(String keyspace, String table, String lockName, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
108         logger.info(EELFLoggerDelegate.applicationLogger,
109                 "Create lock reference for " +  keyspace + "." + table + "." + lockName);
110         String lockTable ="";
111         lockTable = table_prepend_name+table;
112        
113
114
115         PreparedQueryObject queryObject = new PreparedQueryObject();
116         String selectQuery = "SELECT guard FROM " + keyspace + "." + lockTable + " WHERE key=?;";
117
118         queryObject.addValue(lockName);
119         queryObject.appendQueryString(selectQuery);
120         ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject);
121         List<Row> latestGuardRow = gqResult.all();
122
123         long prevGuard = 0;
124         long lockRef = 1;
125         if (!latestGuardRow.isEmpty()) {
126             prevGuard = latestGuardRow.get(0).getLong(0);
127             lockRef = prevGuard + 1;
128         }
129
130         long lockEpochMillis = System.currentTimeMillis();
131
132         logger.info(EELFLoggerDelegate.applicationLogger,
133                 "Created lock reference for " +  keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
134
135         queryObject = new PreparedQueryObject();
136         String insQuery = "BEGIN BATCH" +
137                 " UPDATE " + keyspace + "." + lockTable +
138                 " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
139                 " INSERT INTO " + keyspace + "." + lockTable +
140                 "(key, lockReference, createTime, acquireTime) VALUES (?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
141
142         queryObject.addValue(lockRef);
143         queryObject.addValue(lockName);
144         if (prevGuard != 0)
145             queryObject.addValue(prevGuard);
146
147         queryObject.addValue(lockName);
148         queryObject.addValue(lockRef);
149         queryObject.addValue(String.valueOf(lockEpochMillis));
150         queryObject.addValue("0");
151         queryObject.appendQueryString(insQuery);
152         boolean pResult = dsHandle.executePut(queryObject, "critical");
153         if (!pResult) {//couldn't create lock ref, retry
154             count++;
155             if (count>MusicUtil.getRetryCount()) {
156                 logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to create lock reference");
157                 throw new MusicLockingException("Unable to create lock reference");
158             }
159             return genLockRefandEnQueue(keyspace, table, lockName, count);
160         }
161         return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef);
162     }
163     
164     
165     
166     /**
167      * Returns a result set containing the list of clients waiting for a particular lock
168      * @param keyspace
169      * @param table
170      * @param key
171      * @return list of lockrefs in the queue
172      * @throws MusicServiceException
173      * @throws MusicQueryException
174      */
175     public List<String> getLockQueue(String keyspace, String table, String key)
176             throws MusicServiceException, MusicQueryException {
177         logger.info(EELFLoggerDelegate.applicationLogger,
178                 "Getting the queue for " +  keyspace+"."+table+"."+key);
179         table = table_prepend_name+table;
180         String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "';";
181         PreparedQueryObject queryObject = new PreparedQueryObject();
182         queryObject.appendQueryString(selectQuery);
183         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
184         ArrayList<String> lockQueue = new ArrayList<>();
185         for (Row row: rs) {
186             lockQueue.add(Long.toString(row.getLong("lockReference")));
187         }
188         return lockQueue;
189     }
190     
191     
192     /**
193      * Returns a result set containing the list of clients waiting for a particular lock
194      * @param keyspace
195      * @param table
196      * @param key
197      * @return size of lockrefs queue
198      * @throws MusicServiceException
199      * @throws MusicQueryException
200      */
201     public long getLockQueueSize(String keyspace, String table, String key)
202             throws MusicServiceException, MusicQueryException {
203         logger.info(EELFLoggerDelegate.applicationLogger,
204                 "Getting the queue size for " +  keyspace+"."+table+"."+key);
205         table = table_prepend_name+table;
206         String selectQuery = "select count(*) from " + keyspace + "." + table + " where key='" + key + "';";
207         PreparedQueryObject queryObject = new PreparedQueryObject();
208         queryObject.appendQueryString(selectQuery);
209         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
210         return rs.one().getLong("count");
211     }
212
213
214     /**
215      * This method returns the top of lock table/queue for the key.
216      * @param keyspace of the application.
217      * @param table of the application.
218      * @param key is the primary key of the application table
219      * @return the UUID lock reference. Returns null if there is no owner or the lock doesn't exist
220      * @throws MusicServiceException
221      * @throws MusicQueryException
222      */
223     public LockObject peekLockQueue(String keyspace, String table, String key) throws MusicServiceException, MusicQueryException{
224         logger.info(EELFLoggerDelegate.applicationLogger,
225                 "Peek in lock table for " +  keyspace+"."+table+"."+key);
226         table = table_prepend_name+table; 
227         String selectQuery = "select * from "+keyspace+"."+table+" where key='"+key+"' LIMIT 1;";    
228         PreparedQueryObject queryObject = new PreparedQueryObject();
229         queryObject.appendQueryString(selectQuery);
230         ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
231         Row row = results.one();
232         if (row==null || row.isNull("lockReference")) {
233             return null;
234         }
235         String lockReference = "" + row.getLong("lockReference");
236         String createTime = row.getString("createTime");
237         String acquireTime = row.getString("acquireTime");
238
239         return new LockObject(lockReference, createTime,acquireTime);
240     }
241     
242     
243     /**
244      * This method removes the lock ref from the lock table/queue for the key. 
245      * @param keyspace of the application. 
246      * @param table of the application. 
247      * @param key is the primary key of the application table
248      * @param lockReference the lock reference that needs to be dequeued.
249      * @throws MusicServiceException
250      * @throws MusicQueryException
251      * @throws MusicLockingException 
252      */    
253     public void deQueueLockRef(String keyspace, String table, String key, String lockReference, int n) throws MusicServiceException, MusicQueryException, MusicLockingException{
254         String prependTable = table_prepend_name+table;
255         PreparedQueryObject queryObject = new PreparedQueryObject();
256         Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf("$")+1));
257         String deleteQuery = "delete from "+keyspace+"."+prependTable+" where key='"+key+"' AND lockReference ="+lockReferenceL+" IF EXISTS;";
258         queryObject.appendQueryString(deleteQuery);
259         logger.info(EELFLoggerDelegate.applicationLogger, "Removing lock for key: "+key+ " and reference: "+lockReference);
260         try {
261         dsHandle.executePut(queryObject, "critical"); 
262         logger.info(EELFLoggerDelegate.applicationLogger, "Lock removed for key: "+key+ " and reference: "+lockReference);
263         }catch(MusicServiceException ex) {
264             logger.error(logger, ex.getMessage(),ex);
265             logger.error(EELFLoggerDelegate.applicationLogger,"Exception while deQueueLockRef for lockname: " + key + " reference:" +lockReference);
266             if(n>1) {
267                 logger.info(EELFLoggerDelegate.applicationLogger, "Trying again...");
268                 deQueueLockRef(keyspace, table, key, lockReference, n-1);
269             }
270             else {
271                 logger.error(EELFLoggerDelegate.applicationLogger,"deQueueLockRef failed for lockname: " + key + " reference:" +lockReference);
272                 logger.error(logger, ex.getMessage(),ex);
273                 throw new MusicLockingException("Error while deQueueLockRef: "+ex.getMessage());
274             }
275         }
276     }
277     
278
279     public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) throws MusicServiceException, MusicQueryException{
280         table = table_prepend_name+table;
281         PreparedQueryObject queryObject = new PreparedQueryObject();
282         Long lockReferenceL = Long.parseLong(lockReference);
283         String updateQuery = "update "+keyspace+"."+table+" set acquireTime='"+ System.currentTimeMillis()+"' where key='"+key+"' AND lockReference = "+lockReferenceL+" IF EXISTS;";
284         queryObject.appendQueryString(updateQuery);
285         dsHandle.executePut(queryObject, "eventual");    
286
287     }
288     
289
290 }