Merge "MusicPolicyVoilationException-Junit TestCase added"
[music.git] / src / main / java / org / onap / music / lockingservice / cassandra / CassaLockStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *
19  * ============LICENSE_END=============================================
20  * ====================================================================
21  */
22
23 package org.onap.music.lockingservice.cassandra;
24
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import org.onap.music.datastore.MusicDataStore;
29 import org.onap.music.datastore.PreparedQueryObject;
30 import org.onap.music.eelf.logging.EELFLoggerDelegate;
31 import org.onap.music.exceptions.MusicQueryException;
32 import org.onap.music.exceptions.MusicServiceException;
33
34 import com.datastax.driver.core.ResultSet;
35 import com.datastax.driver.core.Row;
36
37 /*
38  * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state. 
39  */
40
41 public class CassaLockStore {
42     
43     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
44     private static String table_prepend_name = "lockQ_";
45     
46     public class LockObject{
47         public String lockRef;
48         public String createTime;
49         public String acquireTime;
50         public LockObject(String lockRef, String createTime,     String acquireTime) {
51             this.lockRef = lockRef;
52             this.acquireTime = acquireTime;
53             this.createTime = createTime;
54             
55         }
56     }
57     MusicDataStore dsHandle;
58     public CassaLockStore() {
59         dsHandle = new MusicDataStore();
60     }
61     
62     public CassaLockStore(MusicDataStore dsHandle) {
63         this.dsHandle=dsHandle;
64     }
65
66     
67     /**
68      * 
69      * This method creates a shadow locking table for every main table in Cassandra. This table tracks all information regarding locks. 
70      * @param keyspace of the application. 
71      * @param table of the application. 
72      * @return true if the operation was successful.
73      * @throws MusicServiceException
74      * @throws MusicQueryException
75      */
76     public boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
77         logger.info(EELFLoggerDelegate.applicationLogger,
78                 "Create lock queue/table for " +  keyspace+"."+table);
79         table = table_prepend_name+table;
80         String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
81                 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, PRIMARY KEY ((key), lockReference) ) "
82                 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
83         PreparedQueryObject queryObject = new PreparedQueryObject();
84         
85         queryObject.appendQueryString(tabQuery);
86         boolean result;
87         result = dsHandle.executePut(queryObject, "eventual");
88         return result;
89     }
90
91     /**
92      * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps.
93      * @param keyspace of the locks.
94      * @param table of the locks.
95      * @param lockName is the primary key of the lock table
96      * @return the UUID lock reference.
97      * @throws MusicServiceException
98      * @throws MusicQueryException
99      */
100     public String genLockRefandEnQueue(String keyspace, String table, String lockName) throws MusicServiceException, MusicQueryException {
101         logger.info(EELFLoggerDelegate.applicationLogger,
102                 "Create lock reference for " +  keyspace + "." + table + "." + lockName);
103         String lockTable ="";
104         lockTable = table_prepend_name+table;
105        
106
107
108         PreparedQueryObject queryObject = new PreparedQueryObject();
109         String selectQuery = "SELECT guard FROM " + keyspace + "." + lockTable + " WHERE key=?;";
110
111         queryObject.addValue(lockName);
112         queryObject.appendQueryString(selectQuery);
113         ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject);
114         List<Row> latestGuardRow = gqResult.all();
115
116         long prevGuard = 0;
117         long lockRef = 1;
118         if (latestGuardRow.size() > 0) {
119             prevGuard = latestGuardRow.get(0).getLong(0);
120             lockRef = prevGuard + 1;
121         }
122
123         long lockEpochMillis = System.currentTimeMillis();
124
125 //        System.out.println("guard(" + lockName + "): " + prevGuard + "->" + lockRef);
126         logger.info(EELFLoggerDelegate.applicationLogger,
127                 "Created lock reference for " +  keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
128
129         queryObject = new PreparedQueryObject();
130         String insQuery = "BEGIN BATCH" +
131                 " UPDATE " + keyspace + "." + lockTable +
132                 " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
133                 " INSERT INTO " + keyspace + "." + lockTable +
134                 "(key, lockReference, createTime, acquireTime) VALUES (?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
135
136         queryObject.addValue(lockRef);
137         queryObject.addValue(lockName);
138         if (prevGuard != 0)
139             queryObject.addValue(prevGuard);
140
141         queryObject.addValue(lockName);
142         queryObject.addValue(lockRef);
143         queryObject.addValue(String.valueOf(lockEpochMillis));
144         queryObject.addValue("0");
145         queryObject.appendQueryString(insQuery);
146         boolean pResult = dsHandle.executePut(queryObject, "critical");
147         return "$"+keyspace+"."+table+"."+lockName+"$"+String.valueOf(lockRef);
148     }
149     
150     /**
151      * Returns a result set containing the list of clients waiting for a particular lock
152      * @param keyspace
153      * @param table
154      * @param key
155      * @return list of lockrefs in the queue
156      * @throws MusicServiceException
157      * @throws MusicQueryException
158      */
159     public List<String> getLockQueue(String keyspace, String table, String key)
160             throws MusicServiceException, MusicQueryException {
161         logger.info(EELFLoggerDelegate.applicationLogger,
162                 "Getting the queue for " +  keyspace+"."+table+"."+key);
163         table = table_prepend_name+table;
164         String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "';";
165         PreparedQueryObject queryObject = new PreparedQueryObject();
166         queryObject.appendQueryString(selectQuery);
167         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
168         ArrayList<String> lockQueue = new ArrayList<>();
169         for (Row row: rs) {
170             lockQueue.add(Long.toString(row.getLong("lockReference")));
171         }
172         return lockQueue;
173     }
174     
175     
176     /**
177      * Returns a result set containing the list of clients waiting for a particular lock
178      * @param keyspace
179      * @param table
180      * @param key
181      * @return size of lockrefs queue
182      * @throws MusicServiceException
183      * @throws MusicQueryException
184      */
185     public long getLockQueueSize(String keyspace, String table, String key)
186             throws MusicServiceException, MusicQueryException {
187         logger.info(EELFLoggerDelegate.applicationLogger,
188                 "Getting the queue size for " +  keyspace+"."+table+"."+key);
189         table = table_prepend_name+table;
190         String selectQuery = "select count(*) from " + keyspace + "." + table + " where key='" + key + "';";
191         PreparedQueryObject queryObject = new PreparedQueryObject();
192         queryObject.appendQueryString(selectQuery);
193         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
194         return rs.one().getLong("count");
195     }
196
197
198     /**
199      * This method returns the top of lock table/queue for the key.
200      * @param keyspace of the application.
201      * @param table of the application.
202      * @param key is the primary key of the application table
203      * @return the UUID lock reference.
204      * @throws MusicServiceException
205      * @throws MusicQueryException
206      */
207     public LockObject peekLockQueue(String keyspace, String table, String key) throws MusicServiceException, MusicQueryException{
208         logger.info(EELFLoggerDelegate.applicationLogger,
209                 "Peek in lock table for " +  keyspace+"."+table+"."+key);
210         table = table_prepend_name+table; 
211         String selectQuery = "select * from "+keyspace+"."+table+" where key='"+key+"' LIMIT 1;";    
212         PreparedQueryObject queryObject = new PreparedQueryObject();
213         queryObject.appendQueryString(selectQuery);
214         ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
215         Row row = results.one();
216         String lockReference = "" + row.getLong("lockReference");
217         String createTime = row.getString("createTime");
218         String acquireTime = row.getString("acquireTime");
219
220         return new LockObject(lockReference, createTime,acquireTime);
221     }
222     
223     
224     /**
225      * This method removes the lock ref from the lock table/queue for the key. 
226      * @param keyspace of the application. 
227      * @param table of the application. 
228      * @param key is the primary key of the application table
229      * @param lockReference the lock reference that needs to be dequeued.
230      * @throws MusicServiceException
231      * @throws MusicQueryException
232      */    
233     public void deQueueLockRef(String keyspace, String table, String key, String lockReference) throws MusicServiceException, MusicQueryException{
234         table = table_prepend_name+table; 
235         PreparedQueryObject queryObject = new PreparedQueryObject();
236         Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf("$")+1));
237         String deleteQuery = "delete from "+keyspace+"."+table+" where key='"+key+"' AND lockReference ="+lockReferenceL+" IF EXISTS;";
238         queryObject.appendQueryString(deleteQuery);
239         dsHandle.executePut(queryObject, "critical");    
240     }
241     
242
243     public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) throws MusicServiceException, MusicQueryException{
244         table = table_prepend_name+table;
245         PreparedQueryObject queryObject = new PreparedQueryObject();
246         Long lockReferenceL = Long.parseLong(lockReference);
247         String updateQuery = "update "+keyspace+"."+table+" set acquireTime='"+ System.currentTimeMillis()+"' where key='"+key+"' AND lockReference = "+lockReferenceL+" IF EXISTS;";
248         queryObject.appendQueryString(updateQuery);
249         dsHandle.executePut(queryObject, "eventual");    
250
251     }
252     
253
254 }