2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * ============LICENSE_END=============================================
20 * ====================================================================
22 package org.onap.music.client;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.List;
32 import org.onap.music.datastore.jsonobjects.JsonInsert;
33 import org.onap.music.datastore.jsonobjects.JsonKeySpace;
34 import org.onap.music.datastore.jsonobjects.JsonTable;
35 import org.onap.music.eelf.logging.EELFLoggerDelegate;
36 import org.onap.music.lockingservice.MusicLockingService;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import com.datastax.driver.core.Cluster;
40 import com.datastax.driver.core.ColumnDefinitions;
41 import com.datastax.driver.core.ColumnDefinitions.Definition;
42 import com.datastax.driver.core.ConsistencyLevel;
43 import com.datastax.driver.core.DataType;
44 import com.datastax.driver.core.KeyspaceMetadata;
45 import com.datastax.driver.core.ResultSet;
46 import com.datastax.driver.core.Row;
47 import com.datastax.driver.core.Session;
48 import com.datastax.driver.core.SimpleStatement;
49 import com.datastax.driver.core.Statement;
50 import com.datastax.driver.core.TableMetadata;
51 import com.datastax.driver.core.querybuilder.Clause;
52 import com.datastax.driver.core.querybuilder.Delete;
53 import com.datastax.driver.core.querybuilder.QueryBuilder;
54 import com.datastax.driver.core.querybuilder.Select;
57 * A MUSIC client that talks directly to Cassandra/ZooKeeper. This was taken, and slightly modified,
58 * from the REST version of the code.
62 public class MusicClient {
63 private EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(MusicClient.class);
65 private final String[] music_hosts; // array of hosts in the music cluster
66 private Cluster cluster; // MUSIC Cassandra cluster
67 private Session session; // MUSIC Cassandra session
68 private MusicLockingService mls; // ZooKeeper
69 private final Set<String> lockNames;// set of active lock names
72 * Create a MUSIC client that talks to MUSIC on localhost.
74 public MusicClient() {
79 * Create a MUSIC client that talks to MUSIC on a remote host. The string <i>hosts</i> is a
80 * comma-separated list of IP addresses for remote instances of Cassandra/ZooKeeper.
82 * @param hosts the list of hostnames
84 public MusicClient(String hosts) {
85 music_hosts = hosts.split(",");
86 if (cluster == null) {
87 LOG.debug("Initializing MUSIC Client with endpoints " + hosts);
88 cluster = Cluster.builder().addContactPoints(music_hosts).build();
90 session = cluster.connect();
92 lockNames = new HashSet<String>();
96 * Close the connection to MUSIC.
99 if (session != null) {
103 if (cluster != null) {
110 * Be sure to close the connection to MUSIC when this object is GC-ed.
113 protected void finalize() {
118 * Return a String representation of the music hosts used by this object.
123 public String toString() {
124 List<String> t = Arrays.asList(music_hosts);
125 return "MUSIC hosts=" + t.toString();
131 * @see org.onap.music.lockingservice.MusicLockingService#createLock(String)
132 * @param lockName the lock name
135 public String createLock(String lockName) {
136 String ln = "/" + lockName;
137 synchronized (lockNames) {
140 return getLockingService().createLockId(ln);
146 * @see org.onap.music.lockingservice.MusicLockingService#lock(String)
147 * @param lockName the lock name
150 public boolean acquireLock(String lockName) {
151 return getLockingService().isMyTurn(lockName);
155 * Get the lock holder.
157 * @see org.onap.music.lockingservice.MusicLockingService#currentLockHolder(String)
158 * @param lockName the lock name
161 public String getLockHolder(String lockName) {
162 return getLockingService().whoseTurnIsIt("/" + lockName);
168 * @see org.onap.music.lockingservice.MusicLockingService#unlock(String)
169 * @param lockName the lock name
171 public void unlockLock(String lockName) {
172 getLockingService().unlockAndDeleteId(lockName);
178 * @see org.onap.music.lockingservice.MusicLockingService#deleteLock(String)
179 * @param lockName the lock name
181 public void deleteLock(String lockName) {
182 String ln = "/" + lockName;
183 synchronized (lockNames) {
184 lockNames.remove(ln);
186 getLockingService().deleteLock(ln);
192 * @see org.onap.music.lockingservice.MusicLockingService#deleteLock(String)
195 public boolean deleteAllLocks() {
196 synchronized (lockNames) {
197 for (String lockName : lockNames) {
198 deleteLock(lockName);
206 * Create a keyspace using the default replication configuration.
208 * @param keyspaceName the name of the keyspace
209 * @return always true currently
210 * @throws Exception Cassandra exceptions are passed through
212 public boolean createKeyspace(String keyspaceName) throws Exception {
213 Map<String, Object> repl = new HashMap<String, Object>();
214 repl.put("class", "SimpleStrategy");
215 repl.put("replication_factor", 1);
216 Map<String, String> consistencyInfo = Collections.singletonMap("type", "eventual");
217 JsonKeySpace jsonKp = new JsonKeySpace();
218 jsonKp.setConsistencyInfo(consistencyInfo);
219 jsonKp.setDurabilityOfWrites("true");
220 jsonKp.setReplicationInfo(repl);
221 return createKeyspace(keyspaceName, jsonKp);
224 public boolean createKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception {
225 String consistency = extractConsistencyInfo(keyspaceName, kspObject.getConsistencyInfo());
226 Map<String, Object> replicationInfo = kspObject.getReplicationInfo();
227 String durability = "";
228 if (kspObject.getDurabilityOfWrites() != null)
229 durability = " AND durable_writes = " + kspObject.getDurabilityOfWrites();
230 String query = String.format(
231 "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { %s } %s;",
232 keyspaceName, jsonMaptoSqlString(replicationInfo, ","), durability);
233 LOG.info(EELFLoggerDelegate.applicationLogger,query);
234 executeCreateQuery(query, consistency);
238 public boolean dropKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception {
239 String consistency = extractConsistencyInfo(keyspaceName, kspObject.getConsistencyInfo());
240 String query = String.format("DROP KEYSPACE %s;", keyspaceName);
241 LOG.info(EELFLoggerDelegate.applicationLogger,query);
242 executeCreateQuery(query, consistency);
246 public boolean createTable(String tablename, Map<String, String> cols) throws Exception {
247 JsonTable tableObj = new JsonTable();
248 Map<String, String> map = new HashMap<String, String>(); // This should be in the
250 map.put("type", "eventual");
251 tableObj.setConsistencyInfo(map);
252 return createTable(tablename, cols, tableObj);
255 public boolean createTable(String tablename, Map<String, String> cols, JsonTable tableObj)
257 // Note: https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_table_r.html
259 // first read the information about the table fields
260 StringBuilder fields = new StringBuilder();
262 for (String key : cols.keySet()) {
263 fields.append(prefix).append(key).append(" ").append(cols.get(key));
267 String query = String.format("CREATE TABLE IF NOT EXISTS %s (%s);", tablename,
271 String consistency = extractConsistencyInfo(tablename, tableObj.getConsistencyInfo());
272 executeCreateQuery(query, consistency);
276 public boolean dropTable(String name) {
281 public boolean insertRow(String name, Map<String, Object> valuesMap) throws Exception {
282 Map<String, String> consistencyInfo = Collections.singletonMap("type", "eventual");
283 return insertRow(name, valuesMap, consistencyInfo, new JsonInsert());
286 public boolean insertRow(String tablename, Map<String, Object> valuesMap,
287 Map<String, String> consistencyInfo, JsonInsert insObj) throws Exception {
288 // Note: https://docs.datastax.com/en/cql/3.0/cql/cql_reference/insert_r.html
289 String[] parts = tablename.split("\\.");
290 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]);
291 TableMetadata tableInfo = ks.getTable(parts[1]);
293 StringBuilder fields = new StringBuilder();
294 StringBuilder values = new StringBuilder();
296 for (String key : valuesMap.keySet()) {
297 fields.append(prefix).append(key);
298 Object valueObj = valuesMap.get(key);
299 DataType colType = tableInfo.getColumn(key).getType();
300 values.append(prefix).append(convertToSqlDataType(colType, valueObj));
304 String suffix = getTTLSuffix(insObj);
305 String query = String.format("INSERT INTO %s (%s) VALUES (%s)%s;", tablename,
306 fields.toString(), values.toString(), suffix);
307 LOG.info(EELFLoggerDelegate.applicationLogger,query);
309 String consistency = extractConsistencyInfo(tablename, consistencyInfo);
310 executeCreateQuery(query, consistency);
314 public boolean lockRow(String name, Map<String, String> cols) {
320 * Select ALL rows in the table.
322 * @param tablename the name of the table
323 * @return a list of maps, one map per row
325 public List<Map<String, Object>> selectRows(final String tablename) {
326 return selectRows(tablename, new HashMap<String, String>());
329 public List<Map<String, Object>> selectRows(final String tablename, Map<String, String> cols) {
331 String tbl = tablename;
332 int ix = tbl.indexOf('.');
334 ns = tablename.substring(0, ix);
335 tbl = tablename.substring(ix + 1);
337 Select sel = QueryBuilder.select().all().from(ns, tbl);
338 Statement stmt = sel;
339 if (cols.size() == 1) {
340 // only handles 1 WHERE value right now
341 String k = cols.keySet().iterator().next();
342 Clause eqclause = QueryBuilder.eq(k, cols.get(k));
343 stmt = sel.where(eqclause);
345 ResultSet resultset = session.execute(stmt);
346 List<Map<String, Object>> results = new ArrayList<Map<String, Object>>();
347 for (Row row : resultset) {
348 Map<String, Object> map = new HashMap<String, Object>();
349 for (Definition definition : row.getColumnDefinitions()) {
350 map.put(definition.getName(),
351 readRow(row, definition.getName(), definition.getType()));
358 private Object readRow(final Row row, final String name, final DataType colType) {
359 switch (colType.getName()) {
361 return row.getLong(name);
363 return row.getBool(name);
365 return row.getDouble(name);
367 return row.getFloat(name);
369 return row.getInt(name);
371 return row.getMap(name, String.class, String.class);
373 return row.getUUID(name);
376 return row.getString(name);
378 return row.getVarint(name);
379 // These are not supported right now....
398 public List<Map<String, String>> OLDselectRows(String tablename, Map<String, String> cols) {
399 String query = String.format("SELECT * FROM %s", tablename);
400 if (cols.size() > 0) {
402 // String[] parts = tablename.split("\\.");
403 // KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]);
404 // TableMetadata tableInfo = ks.getTable(parts[1]);
405 String whereclause = " WHERE";
407 for (String key : cols.keySet()) {
408 String val = cols.get(key);
409 // DataType colType = tableInfo.getColumn(key).getType();
410 whereclause = String.format("%s%s %s = '%s'", whereclause, prefix, key, val);
413 query += whereclause;
416 ResultSet resultset = session.execute(query);
417 List<Map<String, String>> results = new ArrayList<Map<String, String>>();
418 for (Row row : resultset) {
419 ColumnDefinitions colInfo = row.getColumnDefinitions();
420 Map<String, String> map = new HashMap<String, String>();
421 for (Definition definition : colInfo) {
422 // map.put(definition.getName(), (String)MusicDataStore.readRow(row,
423 // definition.getName(), definition.getType()));
430 public void updateRows(String tablename, Map<String, String> cols, Map<String, Object> vals)
432 Map<String, String> consistencyInfo = Collections.singletonMap("type", "eventual");
433 updateRows(tablename, cols, vals, consistencyInfo, new JsonInsert());
436 public void updateRows(String tablename, Map<String, String> cols, Map<String, Object> vals,
437 Map<String, String> consistencyInfo, JsonInsert insObj) throws Exception {
438 // https://docs.datastax.com/en/cql/3.0/cql/cql_reference/update_r.html
440 // obtain the field value pairs of the update
441 String[] parts = tablename.split("\\.");
442 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]);
443 TableMetadata tableInfo = ks.getTable(parts[1]);
445 StringBuilder fields = new StringBuilder();
447 for (String key : vals.keySet()) {
448 Object valueObj = vals.get(key);
449 String valueString = convertToSqlDataType(tableInfo.getColumn(key).getType(), valueObj);
450 fields.append(prefix).append(key).append(" = ").append(valueString);
454 // get the row specifier
455 StringBuilder rows = new StringBuilder();
456 String primaryKey = "";
458 for (String key : cols.keySet()) {
459 String indValue = cols.get(key);
460 DataType colType = tableInfo.getColumn(key).getType();
461 String formattedValue = convertToSqlDataType(colType, indValue);
462 primaryKey = primaryKey + indValue;
463 rows.append(prefix).append(key).append(" = ").append(formattedValue);
467 String using = getTTLSuffix(insObj);
468 String query = String.format("UPDATE %s%s SET %s WHERE %s;", tablename, using,
469 fields.toString(), rows.toString());
472 String consistency = extractConsistencyInfo(tablename, consistencyInfo);
473 executeCreateQuery(query, consistency);
476 public void deleteRows(String tablename, Map<String, String> cols) {
478 String tbl = tablename;
479 int ix = tbl.indexOf('.');
481 ns = tablename.substring(0, ix);
482 tbl = tablename.substring(ix + 1);
484 Delete stmt = QueryBuilder.delete().from(ns, tbl);
485 if (cols.size() == 1) {
486 // only handles 1 WHERE value right now
487 String k = cols.keySet().iterator().next();
488 Clause eqclause = QueryBuilder.eq(k, cols.get(k));
489 session.execute(stmt.where(eqclause));
491 session.execute(stmt);
495 private String getTTLSuffix(JsonInsert insObj) {
496 String ttl = insObj.getTtl();
497 String timestamp = insObj.getTimestamp();
498 if (ttl != null && ttl.length() > 0) {
499 if (timestamp != null && timestamp.length() > 0) {
500 return " USING TTL " + ttl + " AND TIMESTAMP " + timestamp;
502 return " USING TTL " + ttl;
504 } else if (timestamp != null && timestamp.length() > 0) {
505 return " USING TIMESTAMP " + timestamp;
510 private MusicLockingService getLockingService() {
512 mls = new MusicLockingService(music_hosts[0]);
517 private String extractConsistencyInfo(String key, Map<String, String> consistencyInfo)
519 String consistency = "";
520 if (consistencyInfo.get("type").equalsIgnoreCase("atomic")) {
521 String lockId = consistencyInfo.get("lockId");
522 String lockName = lockId.substring(lockId.indexOf("$") + 1);
523 lockName = lockName.substring(0, lockName.indexOf("$"));
525 // first ensure that the lock name is correct before seeing if it has access
526 if (!lockName.equalsIgnoreCase(key))
527 throw new Exception("THIS LOCK IS NOT FOR THE KEY: " + key);
529 String lockStatus = getLockingService().isMyTurn(lockId) + "";
530 if (lockStatus.equalsIgnoreCase("false"))
531 throw new Exception("YOU DO NOT HAVE THE LOCK");
534 if (consistencyInfo.get("type").equalsIgnoreCase("eventual"))
536 throw new Exception("Consistency type " + consistency + " unknown!!");
539 // utility function to parse json map into sql like string
540 private String jsonMaptoSqlString(Map<String, Object> jMap, String lineDelimiter) {
543 for (Map.Entry<String, Object> entry : jMap.entrySet()) {
544 Object ot = entry.getValue();
545 String value = ot + "";
546 if (ot instanceof String) {
547 value = "'" + value + "'";
549 sql = String.format("%s%s'%s': %s", sql, prefix, entry.getKey(), value);
550 prefix = lineDelimiter;
555 private String convertToSqlDataType(DataType type, Object valueObj) {
556 switch (type.getName()) {
558 String t = valueObj.toString();
559 t = t.replaceAll("'", "''");
560 return "'" + t + "'";
562 @SuppressWarnings("unchecked")
563 Map<String, Object> otMap = (Map<String, Object>) valueObj;
564 return "{" + jsonMaptoSqlString(otMap, ",") + "}";
567 return valueObj.toString();
571 private void executeCreateQuery(String query, String consistency) throws Exception {
572 Statement statement = new SimpleStatement(query);
573 if (consistency.equalsIgnoreCase("atomic"))
574 statement.setConsistencyLevel(ConsistencyLevel.ALL);
575 else if (consistency.equalsIgnoreCase("eventual"))
576 statement.setConsistencyLevel(ConsistencyLevel.ONE);
578 throw new Exception("Consistency level " + consistency + " unknown!!");
579 session.execute(statement);