2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * ============LICENSE_END=============================================
20 * ====================================================================
22 package org.onap.music.client;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.List;
32 import org.onap.music.datastore.jsonobjects.JsonInsert;
33 import org.onap.music.datastore.jsonobjects.JsonKeySpace;
34 import org.onap.music.datastore.jsonobjects.JsonTable;
35 import org.onap.music.lockingservice.MusicLockingService;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38 import com.datastax.driver.core.Cluster;
39 import com.datastax.driver.core.ColumnDefinitions;
40 import com.datastax.driver.core.ColumnDefinitions.Definition;
41 import com.datastax.driver.core.ConsistencyLevel;
42 import com.datastax.driver.core.DataType;
43 import com.datastax.driver.core.KeyspaceMetadata;
44 import com.datastax.driver.core.ResultSet;
45 import com.datastax.driver.core.Row;
46 import com.datastax.driver.core.Session;
47 import com.datastax.driver.core.SimpleStatement;
48 import com.datastax.driver.core.Statement;
49 import com.datastax.driver.core.TableMetadata;
50 import com.datastax.driver.core.querybuilder.Clause;
51 import com.datastax.driver.core.querybuilder.Delete;
52 import com.datastax.driver.core.querybuilder.QueryBuilder;
53 import com.datastax.driver.core.querybuilder.Select;
56 * A MUSIC client that talks directly to Cassandra/ZooKeeper. This was taken, and slightly modified,
57 * from the REST version of the code.
61 public class MusicClient {
62 private static final Logger LOG = LoggerFactory.getLogger(MusicClient.class);
64 private final String[] music_hosts; // array of hosts in the music cluster
65 private Cluster cluster; // MUSIC Cassandra cluster
66 private Session session; // MUSIC Cassandra session
67 private MusicLockingService mls; // ZooKeeper
68 private final Set<String> lockNames;// set of active lock names
71 * Create a MUSIC client that talks to MUSIC on localhost.
73 public MusicClient() {
78 * Create a MUSIC client that talks to MUSIC on a remote host. The string <i>hosts</i> is a
79 * comma-separated list of IP addresses for remote instances of Cassandra/ZooKeeper.
81 * @param hosts the list of hostnames
83 public MusicClient(String hosts) {
84 music_hosts = hosts.split(",");
85 if (cluster == null) {
86 LOG.debug("Initializing MUSIC Client with endpoints " + hosts);
87 cluster = Cluster.builder().addContactPoints(music_hosts).build();
89 session = cluster.connect();
91 lockNames = new HashSet<String>();
95 * Close the connection to MUSIC.
98 if (session != null) {
102 if (cluster != null) {
109 * Be sure to close the connection to MUSIC when this object is GC-ed.
112 protected void finalize() {
117 * Return a String representation of the music hosts used by this object.
122 public String toString() {
123 List<String> t = Arrays.asList(music_hosts);
124 return "MUSIC hosts=" + t.toString();
130 * @see org.onap.music.lockingservice.MusicLockingService#createLock(String)
131 * @param lockName the lock name
134 public String createLock(String lockName) {
135 String ln = "/" + lockName;
136 synchronized (lockNames) {
139 return getLockingService().createLockId(ln);
145 * @see org.onap.music.lockingservice.MusicLockingService#lock(String)
146 * @param lockName the lock name
149 public boolean acquireLock(String lockName) {
150 return getLockingService().isMyTurn(lockName);
154 * Get the lock holder.
156 * @see org.onap.music.lockingservice.MusicLockingService#currentLockHolder(String)
157 * @param lockName the lock name
160 public String getLockHolder(String lockName) {
161 return getLockingService().whoseTurnIsIt("/" + lockName);
167 * @see org.onap.music.lockingservice.MusicLockingService#unlock(String)
168 * @param lockName the lock name
170 public void unlockLock(String lockName) {
171 getLockingService().unlockAndDeleteId(lockName);
177 * @see org.onap.music.lockingservice.MusicLockingService#deleteLock(String)
178 * @param lockName the lock name
180 public void deleteLock(String lockName) {
181 String ln = "/" + lockName;
182 synchronized (lockNames) {
183 lockNames.remove(ln);
185 getLockingService().deleteLock(ln);
191 * @see org.onap.music.lockingservice.MusicLockingService#deleteLock(String)
194 public boolean deleteAllLocks() {
195 synchronized (lockNames) {
196 for (String lockName : lockNames) {
197 deleteLock(lockName);
205 * Create a keyspace using the default replication configuration.
207 * @param keyspaceName the name of the keyspace
208 * @return always true currently
209 * @throws Exception Cassandra exceptions are passed through
211 public boolean createKeyspace(String keyspaceName) throws Exception {
212 Map<String, Object> repl = new HashMap<String, Object>();
213 repl.put("class", "SimpleStrategy");
214 repl.put("replication_factor", 1);
215 Map<String, String> consistencyInfo = Collections.singletonMap("type", "eventual");
216 JsonKeySpace jsonKp = new JsonKeySpace();
217 jsonKp.setConsistencyInfo(consistencyInfo);
218 jsonKp.setDurabilityOfWrites("true");
219 jsonKp.setReplicationInfo(repl);
220 return createKeyspace(keyspaceName, jsonKp);
223 public boolean createKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception {
224 String consistency = extractConsistencyInfo(keyspaceName, kspObject.getConsistencyInfo());
225 Map<String, Object> replicationInfo = kspObject.getReplicationInfo();
226 String durability = "";
227 if (kspObject.getDurabilityOfWrites() != null)
228 durability = " AND durable_writes = " + kspObject.getDurabilityOfWrites();
229 String query = String.format(
230 "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { %s } %s;",
231 keyspaceName, jsonMaptoSqlString(replicationInfo, ","), durability);
233 executeCreateQuery(query, consistency);
237 public boolean dropKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception {
238 String consistency = extractConsistencyInfo(keyspaceName, kspObject.getConsistencyInfo());
239 String query = String.format("DROP KEYSPACE %s;", keyspaceName);
241 executeCreateQuery(query, consistency);
245 public boolean createTable(String tablename, Map<String, String> cols) throws Exception {
246 JsonTable tableObj = new JsonTable();
247 Map<String, String> map = new HashMap<String, String>(); // This should be in the
249 map.put("type", "eventual");
250 tableObj.setConsistencyInfo(map);
251 return createTable(tablename, cols, tableObj);
254 public boolean createTable(String tablename, Map<String, String> cols, JsonTable tableObj)
256 // Note: https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_table_r.html
258 // first read the information about the table fields
259 StringBuilder fields = new StringBuilder();
261 for (String key : cols.keySet()) {
262 fields.append(prefix).append(key).append(" ").append(cols.get(key));
266 // information about the name-value style properties
267 // Map<String,Object> propertiesMap = tableObj.getProperties();
268 // String propertiesString="";
269 // if(propertiesMap != null){
271 // for (Map.Entry<String, Object> entry : propertiesMap.entrySet())
273 // Object ot = entry.getValue();
274 // String value = ot+"";
275 // if(ot instanceof String){
276 // value = "'"+value+"'";
277 // }else if(ot instanceof Map){
278 // Map<String,Object> otMap = (Map<String,Object>)ot;
279 // value = "{"+jsonMaptoSqlString(otMap, ",")+"}";
281 // propertiesString = propertiesString+entry.getKey()+"="+ value+"";
282 // if(counter!=propertiesMap.size()-1)
283 // propertiesString = propertiesString+" AND ";
284 // counter = counter +1;
288 String query = String.format("CREATE TABLE IF NOT EXISTS %s (%s);", tablename,
290 // if (propertiesMap != null)
291 // query = query + " WITH "+ propertiesString;
294 String consistency = extractConsistencyInfo(tablename, tableObj.getConsistencyInfo());
295 executeCreateQuery(query, consistency);
299 public boolean dropTable(String name) {
304 public boolean insertRow(String name, Map<String, Object> valuesMap) throws Exception {
305 Map<String, String> consistencyInfo = Collections.singletonMap("type", "eventual");
306 return insertRow(name, valuesMap, consistencyInfo, new JsonInsert());
309 public boolean insertRow(String tablename, Map<String, Object> valuesMap,
310 Map<String, String> consistencyInfo, JsonInsert insObj) throws Exception {
311 // Note: https://docs.datastax.com/en/cql/3.0/cql/cql_reference/insert_r.html
312 String[] parts = tablename.split("\\.");
313 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]);
314 TableMetadata tableInfo = ks.getTable(parts[1]);
316 StringBuilder fields = new StringBuilder();
317 StringBuilder values = new StringBuilder();
319 for (String key : valuesMap.keySet()) {
320 fields.append(prefix).append(key);
321 Object valueObj = valuesMap.get(key);
322 DataType colType = tableInfo.getColumn(key).getType();
323 values.append(prefix).append(convertToSqlDataType(colType, valueObj));
327 String suffix = getTTLSuffix(insObj);
328 String query = String.format("INSERT INTO %s (%s) VALUES (%s)%s;", tablename,
329 fields.toString(), values.toString(), suffix);
332 String consistency = extractConsistencyInfo(tablename, consistencyInfo);
333 executeCreateQuery(query, consistency);
337 public boolean lockRow(String name, Map<String, String> cols) {
343 * Select ALL rows in the table.
345 * @param tablename the name of the table
346 * @return a list of maps, one map per row
348 public List<Map<String, Object>> selectRows(final String tablename) {
349 return selectRows(tablename, new HashMap<String, String>());
352 public List<Map<String, Object>> selectRows(final String tablename, Map<String, String> cols) {
354 String tbl = tablename;
355 int ix = tbl.indexOf('.');
357 ns = tablename.substring(0, ix);
358 tbl = tablename.substring(ix + 1);
360 Select sel = QueryBuilder.select().all().from(ns, tbl);
361 Statement stmt = sel;
362 if (cols.size() == 1) {
363 // only handles 1 WHERE value right now
364 String k = cols.keySet().iterator().next();
365 Clause eqclause = QueryBuilder.eq(k, cols.get(k));
366 stmt = sel.where(eqclause);
368 ResultSet resultset = session.execute(stmt);
369 List<Map<String, Object>> results = new ArrayList<Map<String, Object>>();
370 for (Row row : resultset) {
371 Map<String, Object> map = new HashMap<String, Object>();
372 for (Definition definition : row.getColumnDefinitions()) {
373 map.put(definition.getName(),
374 readRow(row, definition.getName(), definition.getType()));
381 private Object readRow(final Row row, final String name, final DataType colType) {
382 switch (colType.getName()) {
384 return row.getLong(name);
386 return row.getBool(name);
388 return row.getDouble(name);
390 return row.getFloat(name);
392 return row.getInt(name);
394 return row.getMap(name, String.class, String.class);
396 return row.getUUID(name);
399 return row.getString(name);
401 return row.getVarint(name);
402 // These are not supported right now....
421 public List<Map<String, String>> OLDselectRows(String tablename, Map<String, String> cols) {
422 String query = String.format("SELECT * FROM %s", tablename);
423 if (cols.size() > 0) {
425 // String[] parts = tablename.split("\\.");
426 // KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]);
427 // TableMetadata tableInfo = ks.getTable(parts[1]);
428 String whereclause = " WHERE";
430 for (String key : cols.keySet()) {
431 String val = cols.get(key);
432 // DataType colType = tableInfo.getColumn(key).getType();
433 whereclause = String.format("%s%s %s = '%s'", whereclause, prefix, key, val);
436 query += whereclause;
439 ResultSet resultset = session.execute(query);
440 List<Map<String, String>> results = new ArrayList<Map<String, String>>();
441 for (Row row : resultset) {
442 ColumnDefinitions colInfo = row.getColumnDefinitions();
443 Map<String, String> map = new HashMap<String, String>();
444 for (Definition definition : colInfo) {
445 // map.put(definition.getName(), (String)MusicDataStore.readRow(row,
446 // definition.getName(), definition.getType()));
453 public void updateRows(String tablename, Map<String, String> cols, Map<String, Object> vals)
455 Map<String, String> consistencyInfo = Collections.singletonMap("type", "eventual");
456 updateRows(tablename, cols, vals, consistencyInfo, new JsonInsert());
459 public void updateRows(String tablename, Map<String, String> cols, Map<String, Object> vals,
460 Map<String, String> consistencyInfo, JsonInsert insObj) throws Exception {
461 // https://docs.datastax.com/en/cql/3.0/cql/cql_reference/update_r.html
463 // obtain the field value pairs of the update
464 String[] parts = tablename.split("\\.");
465 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]);
466 TableMetadata tableInfo = ks.getTable(parts[1]);
468 StringBuilder fields = new StringBuilder();
470 for (String key : vals.keySet()) {
471 Object valueObj = vals.get(key);
472 String valueString = convertToSqlDataType(tableInfo.getColumn(key).getType(), valueObj);
473 fields.append(prefix).append(key).append(" = ").append(valueString);
477 // get the row specifier
478 StringBuilder rows = new StringBuilder();
479 String primaryKey = "";
481 for (String key : cols.keySet()) {
482 String indValue = cols.get(key);
483 DataType colType = tableInfo.getColumn(key).getType();
484 String formattedValue = convertToSqlDataType(colType, indValue);
485 primaryKey = primaryKey + indValue;
486 rows.append(prefix).append(key).append(" = ").append(formattedValue);
490 String using = getTTLSuffix(insObj);
491 String query = String.format("UPDATE %s%s SET %s WHERE %s;", tablename, using,
492 fields.toString(), rows.toString());
495 String consistency = extractConsistencyInfo(tablename, consistencyInfo);
496 executeCreateQuery(query, consistency);
499 public void deleteRows(String tablename, Map<String, String> cols) {
501 String tbl = tablename;
502 int ix = tbl.indexOf('.');
504 ns = tablename.substring(0, ix);
505 tbl = tablename.substring(ix + 1);
507 Delete stmt = QueryBuilder.delete().from(ns, tbl);
508 if (cols.size() == 1) {
509 // only handles 1 WHERE value right now
510 String k = cols.keySet().iterator().next();
511 Clause eqclause = QueryBuilder.eq(k, cols.get(k));
512 session.execute(stmt.where(eqclause));
514 session.execute(stmt);
518 private String getTTLSuffix(JsonInsert insObj) {
519 String ttl = insObj.getTtl();
520 String timestamp = insObj.getTimestamp();
521 if (ttl != null && ttl.length() > 0) {
522 if (timestamp != null && timestamp.length() > 0) {
523 return " USING TTL " + ttl + " AND TIMESTAMP " + timestamp;
525 return " USING TTL " + ttl;
527 } else if (timestamp != null && timestamp.length() > 0) {
528 return " USING TIMESTAMP " + timestamp;
533 private MusicLockingService getLockingService() {
535 mls = new MusicLockingService(music_hosts[0]);
540 private String extractConsistencyInfo(String key, Map<String, String> consistencyInfo)
542 String consistency = "";
543 if (consistencyInfo.get("type").equalsIgnoreCase("atomic")) {
544 String lockId = consistencyInfo.get("lockId");
545 String lockName = lockId.substring(lockId.indexOf("$") + 1);
546 lockName = lockName.substring(0, lockName.indexOf("$"));
548 // first ensure that the lock name is correct before seeing if it has access
549 if (!lockName.equalsIgnoreCase(key))
550 throw new Exception("THIS LOCK IS NOT FOR THE KEY: " + key);
552 String lockStatus = getLockingService().isMyTurn(lockId) + "";
553 if (lockStatus.equalsIgnoreCase("false"))
554 throw new Exception("YOU DO NOT HAVE THE LOCK");
557 if (consistencyInfo.get("type").equalsIgnoreCase("eventual"))
559 throw new Exception("Consistency type " + consistency + " unknown!!");
562 // utility function to parse json map into sql like string
563 private String jsonMaptoSqlString(Map<String, Object> jMap, String lineDelimiter) {
566 for (Map.Entry<String, Object> entry : jMap.entrySet()) {
567 Object ot = entry.getValue();
568 String value = ot + "";
569 if (ot instanceof String) {
570 value = "'" + value + "'";
572 sql = String.format("%s%s'%s': %s", sql, prefix, entry.getKey(), value);
573 prefix = lineDelimiter;
578 private String convertToSqlDataType(DataType type, Object valueObj) {
579 switch (type.getName()) {
581 String t = valueObj.toString();
582 t = t.replaceAll("'", "''");
583 return "'" + t + "'";
585 @SuppressWarnings("unchecked")
586 Map<String, Object> otMap = (Map<String, Object>) valueObj;
587 return "{" + jsonMaptoSqlString(otMap, ",") + "}";
590 return valueObj.toString();
594 private void executeCreateQuery(String query, String consistency) throws Exception {
595 Statement statement = new SimpleStatement(query);
596 if (consistency.equalsIgnoreCase("atomic"))
597 statement.setConsistencyLevel(ConsistencyLevel.ALL);
598 else if (consistency.equalsIgnoreCase("eventual"))
599 statement.setConsistencyLevel(ConsistencyLevel.ONE);
601 throw new Exception("Consistency level " + consistency + " unknown!!");
602 session.execute(statement);