Initial code Import.
[music.git] / src / main / java / org / onap / music / client / MusicClient.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  * 
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  * 
19  * ============LICENSE_END=============================================
20  * ====================================================================
21  */
22 package org.onap.music.client;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import org.onap.music.datastore.jsonobjects.JsonInsert;
33 import org.onap.music.datastore.jsonobjects.JsonKeySpace;
34 import org.onap.music.datastore.jsonobjects.JsonTable;
35 import org.onap.music.lockingservice.MusicLockingService;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38 import com.datastax.driver.core.Cluster;
39 import com.datastax.driver.core.ColumnDefinitions;
40 import com.datastax.driver.core.ColumnDefinitions.Definition;
41 import com.datastax.driver.core.ConsistencyLevel;
42 import com.datastax.driver.core.DataType;
43 import com.datastax.driver.core.KeyspaceMetadata;
44 import com.datastax.driver.core.ResultSet;
45 import com.datastax.driver.core.Row;
46 import com.datastax.driver.core.Session;
47 import com.datastax.driver.core.SimpleStatement;
48 import com.datastax.driver.core.Statement;
49 import com.datastax.driver.core.TableMetadata;
50 import com.datastax.driver.core.querybuilder.Clause;
51 import com.datastax.driver.core.querybuilder.Delete;
52 import com.datastax.driver.core.querybuilder.QueryBuilder;
53 import com.datastax.driver.core.querybuilder.Select;
54
55 /**
56  * A MUSIC client that talks directly to Cassandra/ZooKeeper. This was taken, and slightly modified,
57  * from the REST version of the code.
58  *
59  * @author Robert Eby
60  */
61 public class MusicClient {
62     private static final Logger LOG = LoggerFactory.getLogger(MusicClient.class);
63
64     private final String[] music_hosts; // array of hosts in the music cluster
65     private Cluster cluster; // MUSIC Cassandra cluster
66     private Session session; // MUSIC Cassandra session
67     private MusicLockingService mls; // ZooKeeper
68     private final Set<String> lockNames;// set of active lock names
69
70     /**
71      * Create a MUSIC client that talks to MUSIC on localhost.
72      */
73     public MusicClient() {
74         this("127.0.0.1");
75     }
76
77     /**
78      * Create a MUSIC client that talks to MUSIC on a remote host. The string <i>hosts</i> is a
79      * comma-separated list of IP addresses for remote instances of Cassandra/ZooKeeper.
80      * 
81      * @param hosts the list of hostnames
82      */
83     public MusicClient(String hosts) {
84         music_hosts = hosts.split(",");
85         if (cluster == null) {
86             LOG.debug("Initializing MUSIC Client with endpoints " + hosts);
87             cluster = Cluster.builder().addContactPoints(music_hosts).build();
88         }
89         session = cluster.connect();
90         mls = null;
91         lockNames = new HashSet<String>();
92     }
93
94     /**
95      * Close the connection to MUSIC.
96      */
97     public void close() {
98         if (session != null) {
99             session.close();
100             session = null;
101         }
102         if (cluster != null) {
103             cluster.close();
104             cluster = null;
105         }
106     }
107
108     /**
109      * Be sure to close the connection to MUSIC when this object is GC-ed.
110      */
111     @Override
112     protected void finalize() {
113         close();
114     }
115
116     /**
117      * Return a String representation of the music hosts used by this object.
118      * 
119      * @return the string
120      */
121     @Override
122     public String toString() {
123         List<String> t = Arrays.asList(music_hosts);
124         return "MUSIC hosts=" + t.toString();
125     }
126
127     /**
128      * Create a lock.
129      * 
130      * @see org.onap.music.lockingservice.MusicLockingService#createLock(String)
131      * @param lockName the lock name
132      * @return FILL IN
133      */
134     public String createLock(String lockName) {
135         String ln = "/" + lockName;
136         synchronized (lockNames) {
137             lockNames.add(ln);
138         }
139         return getLockingService().createLockId(ln);
140     }
141
142     /**
143      * Acquire a lock.
144      * 
145      * @see org.onap.music.lockingservice.MusicLockingService#lock(String)
146      * @param lockName the lock name
147      * @return FILL IN
148      */
149     public boolean acquireLock(String lockName) {
150         return getLockingService().isMyTurn(lockName);
151     }
152
153     /**
154      * Get the lock holder.
155      * 
156      * @see org.onap.music.lockingservice.MusicLockingService#currentLockHolder(String)
157      * @param lockName the lock name
158      * @return FILL IN
159      */
160     public String getLockHolder(String lockName) {
161         return getLockingService().whoseTurnIsIt("/" + lockName);
162     }
163
164     /**
165      * Unlock a lock.
166      * 
167      * @see org.onap.music.lockingservice.MusicLockingService#unlock(String)
168      * @param lockName the lock name
169      */
170     public void unlockLock(String lockName) {
171         getLockingService().unlockAndDeleteId(lockName);
172     }
173
174     /**
175      * Delete a lock.
176      * 
177      * @see org.onap.music.lockingservice.MusicLockingService#deleteLock(String)
178      * @param lockName the lock name
179      */
180     public void deleteLock(String lockName) {
181         String ln = "/" + lockName;
182         synchronized (lockNames) {
183             lockNames.remove(ln);
184         }
185         getLockingService().deleteLock(ln);
186     }
187
188     /**
189      * Delete all locks.
190      * 
191      * @see org.onap.music.lockingservice.MusicLockingService#deleteLock(String)
192      * @return true
193      */
194     public boolean deleteAllLocks() {
195         synchronized (lockNames) {
196             for (String lockName : lockNames) {
197                 deleteLock(lockName);
198             }
199             lockNames.clear();
200         }
201         return true;
202     }
203
204     /**
205      * Create a keyspace using the default replication configuration.
206      * 
207      * @param keyspaceName the name of the keyspace
208      * @return always true currently
209      * @throws Exception Cassandra exceptions are passed through
210      */
211     public boolean createKeyspace(String keyspaceName) throws Exception {
212         Map<String, Object> repl = new HashMap<String, Object>();
213         repl.put("class", "SimpleStrategy");
214         repl.put("replication_factor", 1);
215         Map<String, String> consistencyInfo = Collections.singletonMap("type", "eventual");
216         JsonKeySpace jsonKp = new JsonKeySpace();
217         jsonKp.setConsistencyInfo(consistencyInfo);
218         jsonKp.setDurabilityOfWrites("true");
219         jsonKp.setReplicationInfo(repl);
220         return createKeyspace(keyspaceName, jsonKp);
221     }
222
223     public boolean createKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception {
224         String consistency = extractConsistencyInfo(keyspaceName, kspObject.getConsistencyInfo());
225         Map<String, Object> replicationInfo = kspObject.getReplicationInfo();
226         String durability = "";
227         if (kspObject.getDurabilityOfWrites() != null)
228             durability = " AND durable_writes = " + kspObject.getDurabilityOfWrites();
229         String query = String.format(
230                         "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { %s } %s;",
231                         keyspaceName, jsonMaptoSqlString(replicationInfo, ","), durability);
232         LOG.debug(query);
233         executeCreateQuery(query, consistency);
234         return true;
235     }
236
237     public boolean dropKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception {
238         String consistency = extractConsistencyInfo(keyspaceName, kspObject.getConsistencyInfo());
239         String query = String.format("DROP KEYSPACE %s;", keyspaceName);
240         LOG.debug(query);
241         executeCreateQuery(query, consistency);
242         return false;
243     }
244
245     public boolean createTable(String tablename, Map<String, String> cols) throws Exception {
246         JsonTable tableObj = new JsonTable();
247         Map<String, String> map = new HashMap<String, String>(); // This should be in the
248                                                                  // consutructor!
249         map.put("type", "eventual");
250         tableObj.setConsistencyInfo(map);
251         return createTable(tablename, cols, tableObj);
252     }
253
254     public boolean createTable(String tablename, Map<String, String> cols, JsonTable tableObj)
255                     throws Exception {
256         // Note: https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_table_r.html
257
258         // first read the information about the table fields
259         StringBuilder fields = new StringBuilder();
260         String prefix = "";
261         for (String key : cols.keySet()) {
262             fields.append(prefix).append(key).append(" ").append(cols.get(key));
263             prefix = ", ";
264         }
265
266         // information about the name-value style properties
267         // Map<String,Object> propertiesMap = tableObj.getProperties();
268         // String propertiesString="";
269         // if(propertiesMap != null){
270         // counter =0;
271         // for (Map.Entry<String, Object> entry : propertiesMap.entrySet())
272         // {
273         // Object ot = entry.getValue();
274         // String value = ot+"";
275         // if(ot instanceof String){
276         // value = "'"+value+"'";
277         // }else if(ot instanceof Map){
278         // Map<String,Object> otMap = (Map<String,Object>)ot;
279         // value = "{"+jsonMaptoSqlString(otMap, ",")+"}";
280         // }
281         // propertiesString = propertiesString+entry.getKey()+"="+ value+"";
282         // if(counter!=propertiesMap.size()-1)
283         // propertiesString = propertiesString+" AND ";
284         // counter = counter +1;
285         // }
286         // }
287
288         String query = String.format("CREATE TABLE IF NOT EXISTS %s (%s);", tablename,
289                         fields.toString());
290         // if (propertiesMap != null)
291         // query = query + " WITH "+ propertiesString;
292
293         LOG.debug(query);
294         String consistency = extractConsistencyInfo(tablename, tableObj.getConsistencyInfo());
295         executeCreateQuery(query, consistency);
296         return false;
297     }
298
299     public boolean dropTable(String name) {
300         // TODO
301         return false;
302     }
303
304     public boolean insertRow(String name, Map<String, Object> valuesMap) throws Exception {
305         Map<String, String> consistencyInfo = Collections.singletonMap("type", "eventual");
306         return insertRow(name, valuesMap, consistencyInfo, new JsonInsert());
307     }
308
309     public boolean insertRow(String tablename, Map<String, Object> valuesMap,
310                     Map<String, String> consistencyInfo, JsonInsert insObj) throws Exception {
311         // Note: https://docs.datastax.com/en/cql/3.0/cql/cql_reference/insert_r.html
312         String[] parts = tablename.split("\\.");
313         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]);
314         TableMetadata tableInfo = ks.getTable(parts[1]);
315
316         StringBuilder fields = new StringBuilder();
317         StringBuilder values = new StringBuilder();
318         String prefix = "";
319         for (String key : valuesMap.keySet()) {
320             fields.append(prefix).append(key);
321             Object valueObj = valuesMap.get(key);
322             DataType colType = tableInfo.getColumn(key).getType();
323             values.append(prefix).append(convertToSqlDataType(colType, valueObj));
324             prefix = ", ";
325         }
326
327         String suffix = getTTLSuffix(insObj);
328         String query = String.format("INSERT INTO %s (%s) VALUES (%s)%s;", tablename,
329                         fields.toString(), values.toString(), suffix);
330         LOG.debug(query);
331
332         String consistency = extractConsistencyInfo(tablename, consistencyInfo);
333         executeCreateQuery(query, consistency);
334         return false;
335     }
336
337     public boolean lockRow(String name, Map<String, String> cols) {
338         // TODO
339         return false;
340     }
341
342     /**
343      * Select ALL rows in the table.
344      * 
345      * @param tablename the name of the table
346      * @return a list of maps, one map per row
347      */
348     public List<Map<String, Object>> selectRows(final String tablename) {
349         return selectRows(tablename, new HashMap<String, String>());
350     }
351
352     public List<Map<String, Object>> selectRows(final String tablename, Map<String, String> cols) {
353         String ns = "";
354         String tbl = tablename;
355         int ix = tbl.indexOf('.');
356         if (ix >= 0) {
357             ns = tablename.substring(0, ix);
358             tbl = tablename.substring(ix + 1);
359         }
360         Select sel = QueryBuilder.select().all().from(ns, tbl);
361         Statement stmt = sel;
362         if (cols.size() == 1) {
363             // only handles 1 WHERE value right now
364             String k = cols.keySet().iterator().next();
365             Clause eqclause = QueryBuilder.eq(k, cols.get(k));
366             stmt = sel.where(eqclause);
367         }
368         ResultSet resultset = session.execute(stmt);
369         List<Map<String, Object>> results = new ArrayList<Map<String, Object>>();
370         for (Row row : resultset) {
371             Map<String, Object> map = new HashMap<String, Object>();
372             for (Definition definition : row.getColumnDefinitions()) {
373                 map.put(definition.getName(),
374                                 readRow(row, definition.getName(), definition.getType()));
375             }
376             results.add(map);
377         }
378         return results;
379     }
380
381     private Object readRow(final Row row, final String name, final DataType colType) {
382         switch (colType.getName()) {
383             case BIGINT:
384                 return row.getLong(name);
385             case BOOLEAN:
386                 return row.getBool(name);
387             case DOUBLE:
388                 return row.getDouble(name);
389             case FLOAT:
390                 return row.getFloat(name);
391             case INT:
392                 return row.getInt(name);
393             case MAP:
394                 return row.getMap(name, String.class, String.class);
395             case UUID:
396                 return row.getUUID(name);
397             case TEXT:
398             case VARCHAR:
399                 return row.getString(name);
400             case VARINT:
401                 return row.getVarint(name);
402             // These are not supported right now....
403             // ASCII
404             // BLOB
405             // COUNTER
406             // CUSTOM
407             // DECIMAL
408             // INET
409             // LIST
410             // SET
411             // TIMESTAMP
412             // TIMEUUID
413             // TUPLE
414             // UDT
415             default:
416                 return null;
417         }
418     }
419
420     @Deprecated
421     public List<Map<String, String>> OLDselectRows(String tablename, Map<String, String> cols) {
422         String query = String.format("SELECT * FROM %s", tablename);
423         if (cols.size() > 0) {
424             // add WHERE clause
425             // String[] parts = tablename.split("\\.");
426             // KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]);
427             // TableMetadata tableInfo = ks.getTable(parts[1]);
428             String whereclause = " WHERE";
429             String prefix = "";
430             for (String key : cols.keySet()) {
431                 String val = cols.get(key);
432                 // DataType colType = tableInfo.getColumn(key).getType();
433                 whereclause = String.format("%s%s %s = '%s'", whereclause, prefix, key, val);
434                 prefix = " AND";
435             }
436             query += whereclause;
437         }
438         LOG.debug(query);
439         ResultSet resultset = session.execute(query);
440         List<Map<String, String>> results = new ArrayList<Map<String, String>>();
441         for (Row row : resultset) {
442             ColumnDefinitions colInfo = row.getColumnDefinitions();
443             Map<String, String> map = new HashMap<String, String>();
444             for (Definition definition : colInfo) {
445                 // map.put(definition.getName(), (String)MusicDataStore.readRow(row,
446                 // definition.getName(), definition.getType()));
447             }
448             results.add(map);
449         }
450         return results;
451     }
452
453     public void updateRows(String tablename, Map<String, String> cols, Map<String, Object> vals)
454                     throws Exception {
455         Map<String, String> consistencyInfo = Collections.singletonMap("type", "eventual");
456         updateRows(tablename, cols, vals, consistencyInfo, new JsonInsert());
457     }
458
459     public void updateRows(String tablename, Map<String, String> cols, Map<String, Object> vals,
460                     Map<String, String> consistencyInfo, JsonInsert insObj) throws Exception {
461         // https://docs.datastax.com/en/cql/3.0/cql/cql_reference/update_r.html
462
463         // obtain the field value pairs of the update
464         String[] parts = tablename.split("\\.");
465         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]);
466         TableMetadata tableInfo = ks.getTable(parts[1]);
467
468         StringBuilder fields = new StringBuilder();
469         String prefix = "";
470         for (String key : vals.keySet()) {
471             Object valueObj = vals.get(key);
472             String valueString = convertToSqlDataType(tableInfo.getColumn(key).getType(), valueObj);
473             fields.append(prefix).append(key).append(" = ").append(valueString);
474             prefix = ", ";
475         }
476
477         // get the row specifier
478         StringBuilder rows = new StringBuilder();
479         String primaryKey = "";
480         prefix = "";
481         for (String key : cols.keySet()) {
482             String indValue = cols.get(key);
483             DataType colType = tableInfo.getColumn(key).getType();
484             String formattedValue = convertToSqlDataType(colType, indValue);
485             primaryKey = primaryKey + indValue;
486             rows.append(prefix).append(key).append(" = ").append(formattedValue);
487             prefix = " AND ";
488         }
489
490         String using = getTTLSuffix(insObj);
491         String query = String.format("UPDATE %s%s SET %s WHERE %s;", tablename, using,
492                         fields.toString(), rows.toString());
493         LOG.debug(query);
494
495         String consistency = extractConsistencyInfo(tablename, consistencyInfo);
496         executeCreateQuery(query, consistency);
497     }
498
499     public void deleteRows(String tablename, Map<String, String> cols) {
500         String ns = "";
501         String tbl = tablename;
502         int ix = tbl.indexOf('.');
503         if (ix >= 0) {
504             ns = tablename.substring(0, ix);
505             tbl = tablename.substring(ix + 1);
506         }
507         Delete stmt = QueryBuilder.delete().from(ns, tbl);
508         if (cols.size() == 1) {
509             // only handles 1 WHERE value right now
510             String k = cols.keySet().iterator().next();
511             Clause eqclause = QueryBuilder.eq(k, cols.get(k));
512             session.execute(stmt.where(eqclause));
513         } else {
514             session.execute(stmt);
515         }
516     }
517
518     private String getTTLSuffix(JsonInsert insObj) {
519         String ttl = insObj.getTtl();
520         String timestamp = insObj.getTimestamp();
521         if (ttl != null && ttl.length() > 0) {
522             if (timestamp != null && timestamp.length() > 0) {
523                 return " USING TTL " + ttl + " AND TIMESTAMP " + timestamp;
524             } else {
525                 return " USING TTL " + ttl;
526             }
527         } else if (timestamp != null && timestamp.length() > 0) {
528             return " USING TIMESTAMP " + timestamp;
529         }
530         return "";
531     }
532
533     private MusicLockingService getLockingService() {
534         if (mls == null) {
535             mls = new MusicLockingService(music_hosts[0]);
536         }
537         return mls;
538     }
539
540     private String extractConsistencyInfo(String key, Map<String, String> consistencyInfo)
541                     throws Exception {
542         String consistency = "";
543         if (consistencyInfo.get("type").equalsIgnoreCase("atomic")) {
544             String lockId = consistencyInfo.get("lockId");
545             String lockName = lockId.substring(lockId.indexOf("$") + 1);
546             lockName = lockName.substring(0, lockName.indexOf("$"));
547
548             // first ensure that the lock name is correct before seeing if it has access
549             if (!lockName.equalsIgnoreCase(key))
550                 throw new Exception("THIS LOCK IS NOT FOR THE KEY: " + key);
551
552             String lockStatus = getLockingService().isMyTurn(lockId) + "";
553             if (lockStatus.equalsIgnoreCase("false"))
554                 throw new Exception("YOU DO NOT HAVE THE LOCK");
555             return "atomic";
556         }
557         if (consistencyInfo.get("type").equalsIgnoreCase("eventual"))
558             return "eventual";
559         throw new Exception("Consistency type " + consistency + " unknown!!");
560     }
561
562     // utility function to parse json map into sql like string
563     private String jsonMaptoSqlString(Map<String, Object> jMap, String lineDelimiter) {
564         String sql = "";
565         String prefix = "";
566         for (Map.Entry<String, Object> entry : jMap.entrySet()) {
567             Object ot = entry.getValue();
568             String value = ot + "";
569             if (ot instanceof String) {
570                 value = "'" + value + "'";
571             }
572             sql = String.format("%s%s'%s': %s", sql, prefix, entry.getKey(), value);
573             prefix = lineDelimiter;
574         }
575         return sql;
576     }
577
578     private String convertToSqlDataType(DataType type, Object valueObj) {
579         switch (type.getName()) {
580             case TEXT:
581                 String t = valueObj.toString();
582                 t = t.replaceAll("'", "''");
583                 return "'" + t + "'";
584             case MAP:
585                 @SuppressWarnings("unchecked")
586                 Map<String, Object> otMap = (Map<String, Object>) valueObj;
587                 return "{" + jsonMaptoSqlString(otMap, ",") + "}";
588             default:
589             case UUID:
590                 return valueObj.toString();
591         }
592     }
593
594     private void executeCreateQuery(String query, String consistency) throws Exception {
595         Statement statement = new SimpleStatement(query);
596         if (consistency.equalsIgnoreCase("atomic"))
597             statement.setConsistencyLevel(ConsistencyLevel.ALL);
598         else if (consistency.equalsIgnoreCase("eventual"))
599             statement.setConsistencyLevel(ConsistencyLevel.ONE);
600         else
601             throw new Exception("Consistency level " + consistency + " unknown!!");
602         session.execute(statement);
603     }
604 }