2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * Modifications Copyright (C) 2019 IBM.
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
20 * ============LICENSE_END=============================================
21 * ====================================================================
24 package org.onap.music.lockingservice.cassandra;
26 import java.util.ArrayList;
27 import java.util.List;
29 import org.onap.music.datastore.MusicDataStore;
30 import org.onap.music.datastore.PreparedQueryObject;
31 import org.onap.music.eelf.logging.EELFLoggerDelegate;
32 import org.onap.music.exceptions.MusicLockingException;
33 import org.onap.music.exceptions.MusicQueryException;
34 import org.onap.music.exceptions.MusicServiceException;
35 import org.onap.music.main.MusicUtil;
37 import com.datastax.driver.core.ResultSet;
38 import com.datastax.driver.core.Row;
41 * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state.
44 public class CassaLockStore {
46 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
47 private static String table_prepend_name = "lockQ_";
49 public class LockObject{
50 public String lockRef;
51 public String createTime;
52 public String acquireTime;
53 public LockObject(String lockRef, String createTime, String acquireTime) {
54 this.lockRef = lockRef;
55 this.acquireTime = acquireTime;
56 this.createTime = createTime;
60 MusicDataStore dsHandle;
61 public CassaLockStore() {
62 dsHandle = new MusicDataStore();
65 public CassaLockStore(MusicDataStore dsHandle) {
66 this.dsHandle=dsHandle;
72 * This method creates a shadow locking table for every main table in Cassandra. This table tracks all information regarding locks.
73 * @param keyspace of the application.
74 * @param table of the application.
75 * @return true if the operation was successful.
76 * @throws MusicServiceException
77 * @throws MusicQueryException
79 public boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
80 logger.info(EELFLoggerDelegate.applicationLogger,
81 "Create lock queue/table for " + keyspace+"."+table);
82 table = table_prepend_name+table;
83 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
84 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, PRIMARY KEY ((key), lockReference) ) "
85 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
86 PreparedQueryObject queryObject = new PreparedQueryObject();
88 queryObject.appendQueryString(tabQuery);
90 result = dsHandle.executePut(queryObject, "eventual");
95 * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps.
96 * @param keyspace of the locks.
97 * @param table of the locks.
98 * @param lockName is the primary key of the lock table
99 * @return the UUID lock reference.
100 * @throws MusicServiceException
101 * @throws MusicQueryException
103 public String genLockRefandEnQueue(String keyspace, String table, String lockName) throws MusicServiceException, MusicQueryException, MusicLockingException {
104 return genLockRefandEnQueue(keyspace, table, lockName, 0);
107 private String genLockRefandEnQueue(String keyspace, String table, String lockName, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
108 logger.info(EELFLoggerDelegate.applicationLogger,
109 "Create lock reference for " + keyspace + "." + table + "." + lockName);
110 String lockTable ="";
111 lockTable = table_prepend_name+table;
115 PreparedQueryObject queryObject = new PreparedQueryObject();
116 String selectQuery = "SELECT guard FROM " + keyspace + "." + lockTable + " WHERE key=?;";
118 queryObject.addValue(lockName);
119 queryObject.appendQueryString(selectQuery);
120 ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject);
121 List<Row> latestGuardRow = gqResult.all();
125 if (!latestGuardRow.isEmpty()) {
126 prevGuard = latestGuardRow.get(0).getLong(0);
127 lockRef = prevGuard + 1;
130 long lockEpochMillis = System.currentTimeMillis();
132 logger.info(EELFLoggerDelegate.applicationLogger,
133 "Created lock reference for " + keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
135 queryObject = new PreparedQueryObject();
136 String insQuery = "BEGIN BATCH" +
137 " UPDATE " + keyspace + "." + lockTable +
138 " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
139 " INSERT INTO " + keyspace + "." + lockTable +
140 "(key, lockReference, createTime, acquireTime) VALUES (?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
142 queryObject.addValue(lockRef);
143 queryObject.addValue(lockName);
145 queryObject.addValue(prevGuard);
147 queryObject.addValue(lockName);
148 queryObject.addValue(lockRef);
149 queryObject.addValue(String.valueOf(lockEpochMillis));
150 queryObject.addValue("0");
151 queryObject.appendQueryString(insQuery);
152 boolean pResult = dsHandle.executePut(queryObject, "critical");
153 if (!pResult) {//couldn't create lock ref, retry
155 if (count>MusicUtil.getRetryCount()) {
156 logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to create lock reference");
157 throw new MusicLockingException("Unable to create lock reference");
159 return genLockRefandEnQueue(keyspace, table, lockName, count);
161 return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef);
167 * Returns a result set containing the list of clients waiting for a particular lock
171 * @return list of lockrefs in the queue
172 * @throws MusicServiceException
173 * @throws MusicQueryException
175 public List<String> getLockQueue(String keyspace, String table, String key)
176 throws MusicServiceException, MusicQueryException {
177 logger.info(EELFLoggerDelegate.applicationLogger,
178 "Getting the queue for " + keyspace+"."+table+"."+key);
179 table = table_prepend_name+table;
180 String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "';";
181 PreparedQueryObject queryObject = new PreparedQueryObject();
182 queryObject.appendQueryString(selectQuery);
183 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
184 ArrayList<String> lockQueue = new ArrayList<>();
186 lockQueue.add(Long.toString(row.getLong("lockReference")));
193 * Returns a result set containing the list of clients waiting for a particular lock
197 * @return size of lockrefs queue
198 * @throws MusicServiceException
199 * @throws MusicQueryException
201 public long getLockQueueSize(String keyspace, String table, String key)
202 throws MusicServiceException, MusicQueryException {
203 logger.info(EELFLoggerDelegate.applicationLogger,
204 "Getting the queue size for " + keyspace+"."+table+"."+key);
205 table = table_prepend_name+table;
206 String selectQuery = "select count(*) from " + keyspace + "." + table + " where key='" + key + "';";
207 PreparedQueryObject queryObject = new PreparedQueryObject();
208 queryObject.appendQueryString(selectQuery);
209 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
210 return rs.one().getLong("count");
215 * This method returns the top of lock table/queue for the key.
216 * @param keyspace of the application.
217 * @param table of the application.
218 * @param key is the primary key of the application table
219 * @return the UUID lock reference. Returns null if there is no owner or the lock doesn't exist
220 * @throws MusicServiceException
221 * @throws MusicQueryException
223 public LockObject peekLockQueue(String keyspace, String table, String key) throws MusicServiceException, MusicQueryException{
224 logger.info(EELFLoggerDelegate.applicationLogger,
225 "Peek in lock table for " + keyspace+"."+table+"."+key);
226 table = table_prepend_name+table;
227 String selectQuery = "select * from "+keyspace+"."+table+" where key='"+key+"' LIMIT 1;";
228 PreparedQueryObject queryObject = new PreparedQueryObject();
229 queryObject.appendQueryString(selectQuery);
230 ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
231 Row row = results.one();
232 if (row==null || row.isNull("lockReference")) {
235 String lockReference = "" + row.getLong("lockReference");
236 String createTime = row.getString("createTime");
237 String acquireTime = row.getString("acquireTime");
239 return new LockObject(lockReference, createTime,acquireTime);
244 * This method removes the lock ref from the lock table/queue for the key.
245 * @param keyspace of the application.
246 * @param table of the application.
247 * @param key is the primary key of the application table
248 * @param lockReference the lock reference that needs to be dequeued.
249 * @throws MusicServiceException
250 * @throws MusicQueryException
251 * @throws MusicLockingException
253 public void deQueueLockRef(String keyspace, String table, String key, String lockReference, int n) throws MusicServiceException, MusicQueryException, MusicLockingException{
254 String prependTable = table_prepend_name+table;
255 PreparedQueryObject queryObject = new PreparedQueryObject();
256 Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf("$")+1));
257 String deleteQuery = "delete from "+keyspace+"."+prependTable+" where key='"+key+"' AND lockReference ="+lockReferenceL+" IF EXISTS;";
258 queryObject.appendQueryString(deleteQuery);
259 logger.info(EELFLoggerDelegate.applicationLogger, "Removing lock for key: "+key+ " and reference: "+lockReference);
261 dsHandle.executePut(queryObject, "critical");
262 logger.info(EELFLoggerDelegate.applicationLogger, "Lock removed for key: "+key+ " and reference: "+lockReference);
263 }catch(MusicServiceException ex) {
264 logger.error(logger, ex.getMessage(),ex);
265 logger.error(EELFLoggerDelegate.applicationLogger,"Exception while deQueueLockRef for lockname: " + key + " reference:" +lockReference);
267 logger.info(EELFLoggerDelegate.applicationLogger, "Trying again...");
268 deQueueLockRef(keyspace, table, key, lockReference, n-1);
271 logger.error(EELFLoggerDelegate.applicationLogger,"deQueueLockRef failed for lockname: " + key + " reference:" +lockReference);
272 logger.error(logger, ex.getMessage(),ex);
273 throw new MusicLockingException("Error while deQueueLockRef: "+ex.getMessage());
279 public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) throws MusicServiceException, MusicQueryException{
280 table = table_prepend_name+table;
281 PreparedQueryObject queryObject = new PreparedQueryObject();
282 Long lockReferenceL = Long.parseLong(lockReference);
283 String updateQuery = "update "+keyspace+"."+table+" set acquireTime='"+ System.currentTimeMillis()+"' where key='"+key+"' AND lockReference = "+lockReferenceL+" IF EXISTS;";
284 queryObject.appendQueryString(updateQuery);
285 dsHandle.executePut(queryObject, "eventual");