2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * Modifications Copyright (C) 2019 IBM.
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
20 * ============LICENSE_END=============================================
21 * ====================================================================
24 package org.onap.music.lockingservice.cassandra;
26 import java.util.ArrayList;
27 import java.util.List;
29 import org.onap.music.datastore.MusicDataStore;
30 import org.onap.music.datastore.PreparedQueryObject;
31 import org.onap.music.eelf.logging.EELFLoggerDelegate;
32 import org.onap.music.exceptions.MusicQueryException;
33 import org.onap.music.exceptions.MusicServiceException;
35 import com.datastax.driver.core.ResultSet;
36 import com.datastax.driver.core.Row;
39 * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state.
42 public class CassaLockStore {
44 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
45 private static String table_prepend_name = "lockQ_";
47 public class LockObject{
48 public String lockRef;
49 public String createTime;
50 public String acquireTime;
51 public LockObject(String lockRef, String createTime, String acquireTime) {
52 this.lockRef = lockRef;
53 this.acquireTime = acquireTime;
54 this.createTime = createTime;
58 MusicDataStore dsHandle;
59 public CassaLockStore() {
60 dsHandle = new MusicDataStore();
63 public CassaLockStore(MusicDataStore dsHandle) {
64 this.dsHandle=dsHandle;
70 * This method creates a shadow locking table for every main table in Cassandra. This table tracks all information regarding locks.
71 * @param keyspace of the application.
72 * @param table of the application.
73 * @return true if the operation was successful.
74 * @throws MusicServiceException
75 * @throws MusicQueryException
77 public boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
78 logger.info(EELFLoggerDelegate.applicationLogger,
79 "Create lock queue/table for " + keyspace+"."+table);
80 table = table_prepend_name+table;
81 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
82 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, PRIMARY KEY ((key), lockReference) ) "
83 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
84 PreparedQueryObject queryObject = new PreparedQueryObject();
86 queryObject.appendQueryString(tabQuery);
88 result = dsHandle.executePut(queryObject, "eventual");
93 * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps.
94 * @param keyspace of the locks.
95 * @param table of the locks.
96 * @param lockName is the primary key of the lock table
97 * @return the UUID lock reference.
98 * @throws MusicServiceException
99 * @throws MusicQueryException
101 public String genLockRefandEnQueue(String keyspace, String table, String lockName) throws MusicServiceException, MusicQueryException {
102 logger.info(EELFLoggerDelegate.applicationLogger,
103 "Create lock reference for " + keyspace + "." + table + "." + lockName);
104 String lockTable ="";
105 lockTable = table_prepend_name+table;
109 PreparedQueryObject queryObject = new PreparedQueryObject();
110 String selectQuery = "SELECT guard FROM " + keyspace + "." + lockTable + " WHERE key=?;";
112 queryObject.addValue(lockName);
113 queryObject.appendQueryString(selectQuery);
114 ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject);
115 List<Row> latestGuardRow = gqResult.all();
119 if (!latestGuardRow.isEmpty()) {
120 prevGuard = latestGuardRow.get(0).getLong(0);
121 lockRef = prevGuard + 1;
124 long lockEpochMillis = System.currentTimeMillis();
126 logger.info(EELFLoggerDelegate.applicationLogger,
127 "Created lock reference for " + keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
129 queryObject = new PreparedQueryObject();
130 String insQuery = "BEGIN BATCH" +
131 " UPDATE " + keyspace + "." + lockTable +
132 " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
133 " INSERT INTO " + keyspace + "." + lockTable +
134 "(key, lockReference, createTime, acquireTime) VALUES (?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
136 queryObject.addValue(lockRef);
137 queryObject.addValue(lockName);
139 queryObject.addValue(prevGuard);
141 queryObject.addValue(lockName);
142 queryObject.addValue(lockRef);
143 queryObject.addValue(String.valueOf(lockEpochMillis));
144 queryObject.addValue("0");
145 queryObject.appendQueryString(insQuery);
146 dsHandle.executePut(queryObject, "critical");
147 return "$"+keyspace+"."+table+"."+lockName+"$"+ lockRef;
151 * Returns a result set containing the list of clients waiting for a particular lock
155 * @return list of lockrefs in the queue
156 * @throws MusicServiceException
157 * @throws MusicQueryException
159 public List<String> getLockQueue(String keyspace, String table, String key)
160 throws MusicServiceException, MusicQueryException {
161 logger.info(EELFLoggerDelegate.applicationLogger,
162 "Getting the queue for " + keyspace+"."+table+"."+key);
163 table = table_prepend_name+table;
164 String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "';";
165 PreparedQueryObject queryObject = new PreparedQueryObject();
166 queryObject.appendQueryString(selectQuery);
167 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
168 ArrayList<String> lockQueue = new ArrayList<>();
170 lockQueue.add(Long.toString(row.getLong("lockReference")));
177 * Returns a result set containing the list of clients waiting for a particular lock
181 * @return size of lockrefs queue
182 * @throws MusicServiceException
183 * @throws MusicQueryException
185 public long getLockQueueSize(String keyspace, String table, String key)
186 throws MusicServiceException, MusicQueryException {
187 logger.info(EELFLoggerDelegate.applicationLogger,
188 "Getting the queue size for " + keyspace+"."+table+"."+key);
189 table = table_prepend_name+table;
190 String selectQuery = "select count(*) from " + keyspace + "." + table + " where key='" + key + "';";
191 PreparedQueryObject queryObject = new PreparedQueryObject();
192 queryObject.appendQueryString(selectQuery);
193 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
194 return rs.one().getLong("count");
199 * This method returns the top of lock table/queue for the key.
200 * @param keyspace of the application.
201 * @param table of the application.
202 * @param key is the primary key of the application table
203 * @return the UUID lock reference.
204 * @throws MusicServiceException
205 * @throws MusicQueryException
207 public LockObject peekLockQueue(String keyspace, String table, String key) throws MusicServiceException, MusicQueryException{
208 logger.info(EELFLoggerDelegate.applicationLogger,
209 "Peek in lock table for " + keyspace+"."+table+"."+key);
210 table = table_prepend_name+table;
211 String selectQuery = "select * from "+keyspace+"."+table+" where key='"+key+"' LIMIT 1;";
212 PreparedQueryObject queryObject = new PreparedQueryObject();
213 queryObject.appendQueryString(selectQuery);
214 ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
215 Row row = results.one();
216 String lockReference = "" + row.getLong("lockReference");
217 String createTime = row.getString("createTime");
218 String acquireTime = row.getString("acquireTime");
220 return new LockObject(lockReference, createTime,acquireTime);
225 * This method removes the lock ref from the lock table/queue for the key.
226 * @param keyspace of the application.
227 * @param table of the application.
228 * @param key is the primary key of the application table
229 * @param lockReference the lock reference that needs to be dequeued.
230 * @throws MusicServiceException
231 * @throws MusicQueryException
233 public void deQueueLockRef(String keyspace, String table, String key, String lockReference) throws MusicServiceException, MusicQueryException{
234 table = table_prepend_name+table;
235 PreparedQueryObject queryObject = new PreparedQueryObject();
236 Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf('$')+1));
237 String deleteQuery = "delete from "+keyspace+"."+table+" where key='"+key+"' AND lockReference ="+lockReferenceL+" IF EXISTS;";
238 queryObject.appendQueryString(deleteQuery);
239 dsHandle.executePut(queryObject, "critical");
243 public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) throws MusicServiceException, MusicQueryException{
244 table = table_prepend_name+table;
245 PreparedQueryObject queryObject = new PreparedQueryObject();
246 Long lockReferenceL = Long.parseLong(lockReference);
247 String updateQuery = "update "+keyspace+"."+table+" set acquireTime='"+ System.currentTimeMillis()+"' where key='"+key+"' AND lockReference = "+lockReferenceL+" IF EXISTS;";
248 queryObject.appendQueryString(updateQuery);
249 dsHandle.executePut(queryObject, "eventual");