e67c98a43014db65c99a5a04648acaf8ad2f753d
[music.git] / src / main / java / org / onap / music / client / MusicClient.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  * 
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  * 
19  * ============LICENSE_END=============================================
20  * ====================================================================
21  */
22 package org.onap.music.client;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import org.onap.music.datastore.jsonobjects.JsonInsert;
33 import org.onap.music.datastore.jsonobjects.JsonKeySpace;
34 import org.onap.music.datastore.jsonobjects.JsonTable;
35 import org.onap.music.eelf.logging.EELFLoggerDelegate;
36 import org.onap.music.lockingservice.MusicLockingService;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import com.datastax.driver.core.Cluster;
40 import com.datastax.driver.core.ColumnDefinitions;
41 import com.datastax.driver.core.ColumnDefinitions.Definition;
42 import com.datastax.driver.core.ConsistencyLevel;
43 import com.datastax.driver.core.DataType;
44 import com.datastax.driver.core.KeyspaceMetadata;
45 import com.datastax.driver.core.ResultSet;
46 import com.datastax.driver.core.Row;
47 import com.datastax.driver.core.Session;
48 import com.datastax.driver.core.SimpleStatement;
49 import com.datastax.driver.core.Statement;
50 import com.datastax.driver.core.TableMetadata;
51 import com.datastax.driver.core.querybuilder.Clause;
52 import com.datastax.driver.core.querybuilder.Delete;
53 import com.datastax.driver.core.querybuilder.QueryBuilder;
54 import com.datastax.driver.core.querybuilder.Select;
55
56 /**
57  * A MUSIC client that talks directly to Cassandra/ZooKeeper. This was taken, and slightly modified,
58  * from the REST version of the code.
59  *
60  * @author Robert Eby
61  */
62 public class MusicClient {
63     private EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(MusicClient.class);
64
65     private final String[] music_hosts; // array of hosts in the music cluster
66     private Cluster cluster; // MUSIC Cassandra cluster
67     private Session session; // MUSIC Cassandra session
68     private MusicLockingService mls; // ZooKeeper
69     private final Set<String> lockNames;// set of active lock names
70
71     /**
72      * Create a MUSIC client that talks to MUSIC on localhost.
73      */
74     public MusicClient() {
75         this("127.0.0.1");
76     }
77
78     /**
79      * Create a MUSIC client that talks to MUSIC on a remote host. The string <i>hosts</i> is a
80      * comma-separated list of IP addresses for remote instances of Cassandra/ZooKeeper.
81      * 
82      * @param hosts the list of hostnames
83      */
84     public MusicClient(String hosts) {
85         music_hosts = hosts.split(",");
86         if (cluster == null) {
87             LOG.debug("Initializing MUSIC Client with endpoints " + hosts);
88             cluster = Cluster.builder().addContactPoints(music_hosts).build();
89         }
90         session = cluster.connect();
91         mls = null;
92         lockNames = new HashSet<String>();
93     }
94
95     /**
96      * Close the connection to MUSIC.
97      */
98     public void close() {
99         if (session != null) {
100             session.close();
101             session = null;
102         }
103         if (cluster != null) {
104             cluster.close();
105             cluster = null;
106         }
107     }
108
109     /**
110      * Be sure to close the connection to MUSIC when this object is GC-ed.
111      */
112     @Override
113     protected void finalize() {
114         close();
115     }
116
117     /**
118      * Return a String representation of the music hosts used by this object.
119      * 
120      * @return the string
121      */
122     @Override
123     public String toString() {
124         List<String> t = Arrays.asList(music_hosts);
125         return "MUSIC hosts=" + t.toString();
126     }
127
128     /**
129      * Create a lock.
130      * 
131      * @see org.onap.music.lockingservice.MusicLockingService#createLock(String)
132      * @param lockName the lock name
133      * @return FILL IN
134      */
135     public String createLock(String lockName) {
136         String ln = "/" + lockName;
137         synchronized (lockNames) {
138             lockNames.add(ln);
139         }
140         return getLockingService().createLockId(ln);
141     }
142
143     /**
144      * Acquire a lock.
145      * 
146      * @see org.onap.music.lockingservice.MusicLockingService#lock(String)
147      * @param lockName the lock name
148      * @return FILL IN
149      */
150     public boolean acquireLock(String lockName) {
151         return getLockingService().isMyTurn(lockName);
152     }
153
154     /**
155      * Get the lock holder.
156      * 
157      * @see org.onap.music.lockingservice.MusicLockingService#currentLockHolder(String)
158      * @param lockName the lock name
159      * @return FILL IN
160      */
161     public String getLockHolder(String lockName) {
162         return getLockingService().whoseTurnIsIt("/" + lockName);
163     }
164
165     /**
166      * Unlock a lock.
167      * 
168      * @see org.onap.music.lockingservice.MusicLockingService#unlock(String)
169      * @param lockName the lock name
170      */
171     public void unlockLock(String lockName) {
172         getLockingService().unlockAndDeleteId(lockName);
173     }
174
175     /**
176      * Delete a lock.
177      * 
178      * @see org.onap.music.lockingservice.MusicLockingService#deleteLock(String)
179      * @param lockName the lock name
180      */
181     public void deleteLock(String lockName) {
182         String ln = "/" + lockName;
183         synchronized (lockNames) {
184             lockNames.remove(ln);
185         }
186         getLockingService().deleteLock(ln);
187     }
188
189     /**
190      * Delete all locks.
191      * 
192      * @see org.onap.music.lockingservice.MusicLockingService#deleteLock(String)
193      * @return true
194      */
195     public boolean deleteAllLocks() {
196         synchronized (lockNames) {
197             for (String lockName : lockNames) {
198                 deleteLock(lockName);
199             }
200             lockNames.clear();
201         }
202         return true;
203     }
204
205     /**
206      * Create a keyspace using the default replication configuration.
207      * 
208      * @param keyspaceName the name of the keyspace
209      * @return always true currently
210      * @throws Exception Cassandra exceptions are passed through
211      */
212     public boolean createKeyspace(String keyspaceName) throws Exception {
213         Map<String, Object> repl = new HashMap<String, Object>();
214         repl.put("class", "SimpleStrategy");
215         repl.put("replication_factor", 1);
216         Map<String, String> consistencyInfo = Collections.singletonMap("type", "eventual");
217         JsonKeySpace jsonKp = new JsonKeySpace();
218         jsonKp.setConsistencyInfo(consistencyInfo);
219         jsonKp.setDurabilityOfWrites("true");
220         jsonKp.setReplicationInfo(repl);
221         return createKeyspace(keyspaceName, jsonKp);
222     }
223
224     public boolean createKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception {
225         String consistency = extractConsistencyInfo(keyspaceName, kspObject.getConsistencyInfo());
226         Map<String, Object> replicationInfo = kspObject.getReplicationInfo();
227         String durability = "";
228         if (kspObject.getDurabilityOfWrites() != null)
229             durability = " AND durable_writes = " + kspObject.getDurabilityOfWrites();
230         String query = String.format(
231                         "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { %s } %s;",
232                         keyspaceName, jsonMaptoSqlString(replicationInfo, ","), durability);
233         LOG.info(EELFLoggerDelegate.applicationLogger,query);
234         executeCreateQuery(query, consistency);
235         return true;
236     }
237
238     public boolean dropKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception {
239         String consistency = extractConsistencyInfo(keyspaceName, kspObject.getConsistencyInfo());
240         String query = String.format("DROP KEYSPACE %s;", keyspaceName);
241         LOG.info(EELFLoggerDelegate.applicationLogger,query);
242         executeCreateQuery(query, consistency);
243         return false;
244     }
245
246     public boolean createTable(String tablename, Map<String, String> cols) throws Exception {
247         JsonTable tableObj = new JsonTable();
248         Map<String, String> map = new HashMap<String, String>(); // This should be in the
249                                                                  // consutructor!
250         map.put("type", "eventual");
251         tableObj.setConsistencyInfo(map);
252         return createTable(tablename, cols, tableObj);
253     }
254
255     public boolean createTable(String tablename, Map<String, String> cols, JsonTable tableObj)
256                     throws Exception {
257         // Note: https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_table_r.html
258
259         // first read the information about the table fields
260         StringBuilder fields = new StringBuilder();
261         String prefix = "";
262         for (String key : cols.keySet()) {
263             fields.append(prefix).append(key).append(" ").append(cols.get(key));
264             prefix = ", ";
265         }
266
267         String query = String.format("CREATE TABLE IF NOT EXISTS %s (%s);", tablename,
268                         fields.toString());
269         
270         LOG.debug(query);
271         String consistency = extractConsistencyInfo(tablename, tableObj.getConsistencyInfo());
272         executeCreateQuery(query, consistency);
273         return false;
274     }
275
276     public boolean dropTable(String name) {
277         // TODO
278         return false;
279     }
280
281     public boolean insertRow(String name, Map<String, Object> valuesMap) throws Exception {
282         Map<String, String> consistencyInfo = Collections.singletonMap("type", "eventual");
283         return insertRow(name, valuesMap, consistencyInfo, new JsonInsert());
284     }
285
286     public boolean insertRow(String tablename, Map<String, Object> valuesMap,
287                     Map<String, String> consistencyInfo, JsonInsert insObj) throws Exception {
288         // Note: https://docs.datastax.com/en/cql/3.0/cql/cql_reference/insert_r.html
289         String[] parts = tablename.split("\\.");
290         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]);
291         TableMetadata tableInfo = ks.getTable(parts[1]);
292
293         StringBuilder fields = new StringBuilder();
294         StringBuilder values = new StringBuilder();
295         String prefix = "";
296         for (String key : valuesMap.keySet()) {
297             fields.append(prefix).append(key);
298             Object valueObj = valuesMap.get(key);
299             DataType colType = tableInfo.getColumn(key).getType();
300             values.append(prefix).append(convertToSqlDataType(colType, valueObj));
301             prefix = ", ";
302         }
303
304         String suffix = getTTLSuffix(insObj);
305         String query = String.format("INSERT INTO %s (%s) VALUES (%s)%s;", tablename,
306                         fields.toString(), values.toString(), suffix);
307         LOG.info(EELFLoggerDelegate.applicationLogger,query);
308
309         String consistency = extractConsistencyInfo(tablename, consistencyInfo);
310         executeCreateQuery(query, consistency);
311         return false;
312     }
313
314     public boolean lockRow(String name, Map<String, String> cols) {
315         // TODO
316         return false;
317     }
318
319     /**
320      * Select ALL rows in the table.
321      * 
322      * @param tablename the name of the table
323      * @return a list of maps, one map per row
324      */
325     public List<Map<String, Object>> selectRows(final String tablename) {
326         return selectRows(tablename, new HashMap<String, String>());
327     }
328
329     public List<Map<String, Object>> selectRows(final String tablename, Map<String, String> cols) {
330         String ns = "";
331         String tbl = tablename;
332         int ix = tbl.indexOf('.');
333         if (ix >= 0) {
334             ns = tablename.substring(0, ix);
335             tbl = tablename.substring(ix + 1);
336         }
337         Select sel = QueryBuilder.select().all().from(ns, tbl);
338         Statement stmt = sel;
339         if (cols.size() == 1) {
340             // only handles 1 WHERE value right now
341             String k = cols.keySet().iterator().next();
342             Clause eqclause = QueryBuilder.eq(k, cols.get(k));
343             stmt = sel.where(eqclause);
344         }
345         ResultSet resultset = session.execute(stmt);
346         List<Map<String, Object>> results = new ArrayList<Map<String, Object>>();
347         for (Row row : resultset) {
348             Map<String, Object> map = new HashMap<String, Object>();
349             for (Definition definition : row.getColumnDefinitions()) {
350                 map.put(definition.getName(),
351                                 readRow(row, definition.getName(), definition.getType()));
352             }
353             results.add(map);
354         }
355         return results;
356     }
357
358     private Object readRow(final Row row, final String name, final DataType colType) {
359         switch (colType.getName()) {
360             case BIGINT:
361                 return row.getLong(name);
362             case BOOLEAN:
363                 return row.getBool(name);
364             case DOUBLE:
365                 return row.getDouble(name);
366             case FLOAT:
367                 return row.getFloat(name);
368             case INT:
369                 return row.getInt(name);
370             case MAP:
371                 return row.getMap(name, String.class, String.class);
372             case UUID:
373                 return row.getUUID(name);
374             case TEXT:
375             case VARCHAR:
376                 return row.getString(name);
377             case VARINT:
378                 return row.getVarint(name);
379             // These are not supported right now....
380             // ASCII
381             // BLOB
382             // COUNTER
383             // CUSTOM
384             // DECIMAL
385             // INET
386             // LIST
387             // SET
388             // TIMESTAMP
389             // TIMEUUID
390             // TUPLE
391             // UDT
392             default:
393                 return null;
394         }
395     }
396
397     @Deprecated
398     public List<Map<String, String>> OLDselectRows(String tablename, Map<String, String> cols) {
399         String query = String.format("SELECT * FROM %s", tablename);
400         if (cols.size() > 0) {
401             // add WHERE clause
402             // String[] parts = tablename.split("\\.");
403             // KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]);
404             // TableMetadata tableInfo = ks.getTable(parts[1]);
405             String whereclause = " WHERE";
406             String prefix = "";
407             for (String key : cols.keySet()) {
408                 String val = cols.get(key);
409                 // DataType colType = tableInfo.getColumn(key).getType();
410                 whereclause = String.format("%s%s %s = '%s'", whereclause, prefix, key, val);
411                 prefix = " AND";
412             }
413             query += whereclause;
414         }
415         LOG.debug(query);
416         ResultSet resultset = session.execute(query);
417         List<Map<String, String>> results = new ArrayList<Map<String, String>>();
418         for (Row row : resultset) {
419             ColumnDefinitions colInfo = row.getColumnDefinitions();
420             Map<String, String> map = new HashMap<String, String>();
421             for (Definition definition : colInfo) {
422                 // map.put(definition.getName(), (String)MusicDataStore.readRow(row,
423                 // definition.getName(), definition.getType()));
424             }
425             results.add(map);
426         }
427         return results;
428     }
429
430     public void updateRows(String tablename, Map<String, String> cols, Map<String, Object> vals)
431                     throws Exception {
432         Map<String, String> consistencyInfo = Collections.singletonMap("type", "eventual");
433         updateRows(tablename, cols, vals, consistencyInfo, new JsonInsert());
434     }
435
436     public void updateRows(String tablename, Map<String, String> cols, Map<String, Object> vals,
437                     Map<String, String> consistencyInfo, JsonInsert insObj) throws Exception {
438         // https://docs.datastax.com/en/cql/3.0/cql/cql_reference/update_r.html
439
440         // obtain the field value pairs of the update
441         String[] parts = tablename.split("\\.");
442         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]);
443         TableMetadata tableInfo = ks.getTable(parts[1]);
444
445         StringBuilder fields = new StringBuilder();
446         String prefix = "";
447         for (String key : vals.keySet()) {
448             Object valueObj = vals.get(key);
449             String valueString = convertToSqlDataType(tableInfo.getColumn(key).getType(), valueObj);
450             fields.append(prefix).append(key).append(" = ").append(valueString);
451             prefix = ", ";
452         }
453
454         // get the row specifier
455         StringBuilder rows = new StringBuilder();
456         String primaryKey = "";
457         prefix = "";
458         for (String key : cols.keySet()) {
459             String indValue = cols.get(key);
460             DataType colType = tableInfo.getColumn(key).getType();
461             String formattedValue = convertToSqlDataType(colType, indValue);
462             primaryKey = primaryKey + indValue;
463             rows.append(prefix).append(key).append(" = ").append(formattedValue);
464             prefix = " AND ";
465         }
466
467         String using = getTTLSuffix(insObj);
468         String query = String.format("UPDATE %s%s SET %s WHERE %s;", tablename, using,
469                         fields.toString(), rows.toString());
470         LOG.debug(query);
471
472         String consistency = extractConsistencyInfo(tablename, consistencyInfo);
473         executeCreateQuery(query, consistency);
474     }
475
476     public void deleteRows(String tablename, Map<String, String> cols) {
477         String ns = "";
478         String tbl = tablename;
479         int ix = tbl.indexOf('.');
480         if (ix >= 0) {
481             ns = tablename.substring(0, ix);
482             tbl = tablename.substring(ix + 1);
483         }
484         Delete stmt = QueryBuilder.delete().from(ns, tbl);
485         if (cols.size() == 1) {
486             // only handles 1 WHERE value right now
487             String k = cols.keySet().iterator().next();
488             Clause eqclause = QueryBuilder.eq(k, cols.get(k));
489             session.execute(stmt.where(eqclause));
490         } else {
491             session.execute(stmt);
492         }
493     }
494
495     private String getTTLSuffix(JsonInsert insObj) {
496         String ttl = insObj.getTtl();
497         String timestamp = insObj.getTimestamp();
498         if (ttl != null && ttl.length() > 0) {
499             if (timestamp != null && timestamp.length() > 0) {
500                 return " USING TTL " + ttl + " AND TIMESTAMP " + timestamp;
501             } else {
502                 return " USING TTL " + ttl;
503             }
504         } else if (timestamp != null && timestamp.length() > 0) {
505             return " USING TIMESTAMP " + timestamp;
506         }
507         return "";
508     }
509
510     private MusicLockingService getLockingService() {
511         if (mls == null) {
512             mls = new MusicLockingService(music_hosts[0]);
513         }
514         return mls;
515     }
516
517     private String extractConsistencyInfo(String key, Map<String, String> consistencyInfo)
518                     throws Exception {
519         String consistency = "";
520         if (consistencyInfo.get("type").equalsIgnoreCase("atomic")) {
521             String lockId = consistencyInfo.get("lockId");
522             String lockName = lockId.substring(lockId.indexOf("$") + 1);
523             lockName = lockName.substring(0, lockName.indexOf("$"));
524
525             // first ensure that the lock name is correct before seeing if it has access
526             if (!lockName.equalsIgnoreCase(key))
527                 throw new Exception("THIS LOCK IS NOT FOR THE KEY: " + key);
528
529             String lockStatus = getLockingService().isMyTurn(lockId) + "";
530             if (lockStatus.equalsIgnoreCase("false"))
531                 throw new Exception("YOU DO NOT HAVE THE LOCK");
532             return "atomic";
533         }
534         if (consistencyInfo.get("type").equalsIgnoreCase("eventual"))
535             return "eventual";
536         throw new Exception("Consistency type " + consistency + " unknown!!");
537     }
538
539     // utility function to parse json map into sql like string
540     private String jsonMaptoSqlString(Map<String, Object> jMap, String lineDelimiter) {
541         String sql = "";
542         String prefix = "";
543         for (Map.Entry<String, Object> entry : jMap.entrySet()) {
544             Object ot = entry.getValue();
545             String value = ot + "";
546             if (ot instanceof String) {
547                 value = "'" + value + "'";
548             }
549             sql = String.format("%s%s'%s': %s", sql, prefix, entry.getKey(), value);
550             prefix = lineDelimiter;
551         }
552         return sql;
553     }
554
555     private String convertToSqlDataType(DataType type, Object valueObj) {
556         switch (type.getName()) {
557             case TEXT:
558                 String t = valueObj.toString();
559                 t = t.replaceAll("'", "''");
560                 return "'" + t + "'";
561             case MAP:
562                 @SuppressWarnings("unchecked")
563                 Map<String, Object> otMap = (Map<String, Object>) valueObj;
564                 return "{" + jsonMaptoSqlString(otMap, ",") + "}";
565             default:
566             case UUID:
567                 return valueObj.toString();
568         }
569     }
570
571     private void executeCreateQuery(String query, String consistency) throws Exception {
572         Statement statement = new SimpleStatement(query);
573         if (consistency.equalsIgnoreCase("atomic"))
574             statement.setConsistencyLevel(ConsistencyLevel.ALL);
575         else if (consistency.equalsIgnoreCase("eventual"))
576             statement.setConsistencyLevel(ConsistencyLevel.ONE);
577         else
578             throw new Exception("Consistency level " + consistency + " unknown!!");
579         session.execute(statement);
580     }
581 }