2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Modifications Copyright (c) 2018-2019 IBM
8 * Modifications Copyright (c) 2019 Samsung
9 * ===================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
22 * ============LICENSE_END=============================================
23 * ====================================================================
26 package org.onap.music.datastore;
28 import java.nio.ByteBuffer;
29 import java.util.HashMap;
32 import org.onap.music.eelf.logging.EELFLoggerDelegate;
33 import org.onap.music.eelf.logging.format.AppMessages;
34 import org.onap.music.eelf.logging.format.ErrorSeverity;
35 import org.onap.music.eelf.logging.format.ErrorTypes;
36 import org.onap.music.exceptions.MusicQueryException;
37 import org.onap.music.exceptions.MusicServiceException;
38 import org.onap.music.lockingservice.cassandra.LockType;
39 import org.onap.music.main.CipherUtil;
40 import org.onap.music.main.MusicUtil;
41 import com.datastax.driver.core.Cluster;
42 import com.datastax.driver.core.ColumnDefinitions;
43 import com.datastax.driver.core.ColumnDefinitions.Definition;
44 import com.datastax.driver.core.ConsistencyLevel;
45 import com.datastax.driver.core.DataType;
46 import com.datastax.driver.core.HostDistance;
47 import com.datastax.driver.core.KeyspaceMetadata;
48 import com.datastax.driver.core.Metadata;
49 import com.datastax.driver.core.PoolingOptions;
50 import com.datastax.driver.core.ResultSet;
51 import com.datastax.driver.core.Row;
52 import com.datastax.driver.core.Session;
53 import com.datastax.driver.core.SimpleStatement;
54 import com.datastax.driver.core.SocketOptions;
55 import com.datastax.driver.core.TableMetadata;
56 import com.datastax.driver.core.exceptions.AlreadyExistsException;
57 import com.datastax.driver.core.exceptions.InvalidQueryException;
58 import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
64 public class MusicDataStore {
66 public static final String CONSISTENCY_LEVEL_ONE = "ONE";
67 public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
68 public static final String CONSISTENCY_LEVEL_LOCAL_QUORUM = "LOCAL_QUORUM";
69 private Session session;
70 private Cluster cluster;
74 * Connect to default Cassandra address
76 public MusicDataStore() {
78 connectToCassaCluster(MusicUtil.getMyCassaHost());
79 } catch (MusicServiceException e) {
80 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
89 public MusicDataStore(Cluster cluster, Session session) {
90 this.session = session;
98 public void setSession(Session session) {
99 this.session = session;
105 public Session getSession() {
112 public void setCluster(Cluster cluster) {
113 EnumNameCodec<LockType> lockTypeCodec = new EnumNameCodec<LockType>(LockType.class);
114 cluster.getConfiguration().getCodecRegistry().register(lockTypeCodec);
116 this.cluster = cluster;
119 public Cluster getCluster() {
124 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
130 * @throws MusicServiceException
132 public MusicDataStore(String remoteIp) {
134 connectToCassaCluster(remoteIp);
135 } catch (MusicServiceException e) {
136 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
143 public void close() {
148 * This method connects to cassandra cluster on specific address.
152 private void connectToCassaCluster(String address) throws MusicServiceException {
153 String[] addresses = null;
154 addresses = address.split(",");
155 PoolingOptions poolingOptions = new PoolingOptions();
157 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
158 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
161 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
162 String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd());
163 logger.info(EELFLoggerDelegate.applicationLogger,
164 "Building with credentials "+MusicUtil.getCassName()+" & "+ MusicUtil.getCassPwd());
165 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
166 .withCredentials(MusicUtil.getCassName(), cassPwd)
167 //.withLoadBalancingPolicy(new RoundRobinPolicy())
168 .withoutJMXReporting()
169 .withPoolingOptions(poolingOptions)
171 new SocketOptions().setConnectTimeoutMillis(MusicUtil.getCassandraConnectTimeOutMS())
172 .setReadTimeoutMillis(MusicUtil.getCassandraReadTimeOutMS()))
173 .addContactPoints(addresses).build();
175 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
176 .withoutJMXReporting()
177 .withPoolingOptions(poolingOptions)
178 .withSocketOptions(new SocketOptions()
179 .setConnectTimeoutMillis(MusicUtil.getCassandraConnectTimeOutMS())
180 .setReadTimeoutMillis(MusicUtil.getCassandraReadTimeOutMS()))
181 .addContactPoints(addresses)
185 this.setCluster(cluster);
186 Metadata metadata = this.cluster.getMetadata();
187 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
188 + metadata.getClusterName() + " at " + address);
191 session = this.cluster.connect();
192 } catch (Exception ex) {
193 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY,
194 ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE, ex);
195 throw new MusicServiceException(
196 "Error while connecting to Cassandra cluster.. " + ex.getMessage());
207 public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
208 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
209 TableMetadata table = ks.getTable(tableName);
210 return table.getColumn(columnName).getType();
218 * @return TableMetadata
220 public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
221 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
222 return ks.getTable(tableName);
229 * @return TableMetadata
231 public KeyspaceMetadata returnKeyspaceMetadata(String keyspace) {
232 return cluster.getMetadata().getKeyspace(keyspace);
237 * Utility function to return the Java specific object type.
244 public Object getColValue(Row row, String colName, DataType colType) {
246 switch (colType.getName()) {
248 return row.getString(colName);
250 return row.getUUID(colName);
252 return row.getVarint(colName);
254 return row.getLong(colName);
256 return row.getInt(colName);
258 return row.getFloat(colName);
260 return row.getDouble(colName);
262 return row.getBool(colName);
264 return row.getMap(colName, String.class, String.class);
266 return row.getList(colName, String.class);
272 public byte[] getBlobValue(Row row, String colName, DataType colType) {
273 ByteBuffer bb = row.getBytes(colName);
277 public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
278 ColumnDefinitions colInfo = row.getColumnDefinitions();
280 for (Map.Entry<String, Object> entry : condition.entrySet()) {
281 String colName = entry.getKey();
282 DataType colType = colInfo.getType(colName);
283 Object columnValue = getColValue(row, colName, colType);
284 Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
285 if (columnValue.equals(conditionValue) == false)
292 * Utility function to store ResultSet values in to a MAP for output.
297 public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
298 Map<String, HashMap<String, Object>> resultMap =
301 for (Row row : results) {
302 ColumnDefinitions colInfo = row.getColumnDefinitions();
303 HashMap<String, Object> resultOutput = new HashMap<>();
304 for (Definition definition : colInfo) {
305 if (!(("vector_ts").equals(definition.getName()))) {
306 if(definition.getType().toString().toLowerCase().contains("blob")) {
307 resultOutput.put(definition.getName(),
308 getBlobValue(row, definition.getName(), definition.getType()));
310 resultOutput.put(definition.getName(),
311 getColValue(row, definition.getName(), definition.getType()));
315 resultMap.put("row " + counter, resultOutput);
322 // Prepared Statements 1802 additions
324 public boolean executePut(PreparedQueryObject queryObject, String consistency)
325 throws MusicServiceException, MusicQueryException {
326 return executePut(queryObject, consistency, 0);
329 * This Method performs DDL and DML operations on Cassandra using specified consistency level
331 * @param queryObject Object containing cassandra prepared query and values.
332 * @param consistency Specify consistency level for data synchronization across cassandra
334 * @return Boolean Indicates operation success or failure
335 * @throws MusicServiceException
336 * @throws MusicQueryException
338 public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot)
339 throws MusicServiceException, MusicQueryException {
341 boolean result = false;
342 long timeOfWrite = System.currentTimeMillis();
343 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
344 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
345 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
346 + queryObject.getQuery() + "]");
348 logger.debug(EELFLoggerDelegate.applicationLogger,
349 "In preprared Execute Put: the actual insert query:"
350 + queryObject.getQuery() + "; the values"
351 + queryObject.getValues());
352 SimpleStatement preparedInsert = null;
355 preparedInsert = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
356 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
357 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
358 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
359 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
360 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
361 if(queryObject.getConsistency() == null)
362 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
364 preparedInsert.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
365 } else if (consistency.equalsIgnoreCase(MusicUtil.ONE)) {
366 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
367 } else if (consistency.equalsIgnoreCase(MusicUtil.QUORUM)) {
368 preparedInsert.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
369 } else if (consistency.equalsIgnoreCase(MusicUtil.ALL)) {
370 preparedInsert.setConsistencyLevel(ConsistencyLevel.ALL);
372 long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite);
373 preparedInsert.setDefaultTimestamp(timestamp);
375 ResultSet rs = session.execute(preparedInsert);
376 result = rs.wasApplied();
377 } catch (AlreadyExistsException ae) {
378 throw new MusicServiceException("Already Exists Exception: " + ae.getMessage());
379 } catch (InvalidQueryException e) {
380 if (e.getMessage().contains("unconfigured table")) {
381 throw new MusicServiceException("Invalid Query Exception: " + e.getMessage());
383 logger.info(EELFLoggerDelegate.applicationLogger, "Query Exception: " + e.getMessage(),
384 AppMessages.SESSIONFAILED + " [" + queryObject.getQuery() + "]", ErrorSeverity.INFO,
385 ErrorTypes.QUERYERROR, e);
386 throw new MusicServiceException("Query Exception: " + e.getMessage());
388 } catch (Exception e) {
389 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),
390 AppMessages.SESSIONFAILED + " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR,
391 ErrorTypes.QUERYERROR, e);
392 throw new MusicServiceException("Executing Session Failure for Request = " + "[" + queryObject.getQuery()
393 + "]" + " Reason = " + e.getMessage());
400 * This method performs DDL operations on Cassandra using consistency level ONE.
402 * @param queryObject Object containing cassandra prepared query and values.
404 * @throws MusicServiceException
405 * @throws MusicQueryException
407 public ResultSet executeEventualGet(PreparedQueryObject queryObject)
408 throws MusicServiceException, MusicQueryException {
409 CacheAccess<String, PreparedStatement> queryBank = CachingUtil.getStatementBank();
410 PreparedStatement preparedEventualGet = null;
411 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
412 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
413 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
414 + queryObject.getQuery() + "]");
416 logger.info(EELFLoggerDelegate.applicationLogger,
417 "Executing Eventual get query:" + queryObject.getQuery());
419 ResultSet results = null;
421 if(queryBank.get(queryObject.getQuery()) != null )
422 preparedEventualGet=queryBank.get(queryObject.getQuery());
424 preparedEventualGet = session.prepare(queryObject.getQuery());
425 CachingUtil.updateStatementBank(queryObject.getQuery(), preparedEventualGet);
427 if(queryObject.getConsistency() == null) {
428 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
430 preparedEventualGet.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
432 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
434 } catch (Exception ex) {
435 logger.error("Exception", ex);
436 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
437 throw new MusicServiceException(ex.getMessage());
444 * This method performs DDL operation on Cassandra using consistency level QUORUM.
446 * @param queryObject Object containing cassandra prepared query and values.
448 * @throws MusicServiceException
449 * @throws MusicQueryException
451 public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
452 throws MusicServiceException, MusicQueryException {
453 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
454 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
455 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
456 + queryObject.getQuery() + "]");
458 logger.info(EELFLoggerDelegate.applicationLogger,
459 "Executing Critical get query:" + queryObject.getQuery());
460 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
461 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
462 ResultSet results = null;
464 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
465 } catch (Exception ex) {
466 logger.error("Exception", ex);
467 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
468 throw new MusicServiceException(ex.getMessage());
474 public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException {
475 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
476 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
477 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
478 + queryObject.getQuery() + "]");
480 ResultSet results = null;
482 SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
483 if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) {
484 statement.setConsistencyLevel(ConsistencyLevel.ONE);
485 } else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
486 statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
487 } else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_LOCAL_QUORUM)) {
488 statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
491 results = session.execute(statement);
493 } catch (Exception ex) {
494 logger.error(EELFLoggerDelegate.errorLogger, "Execute Get Error" + ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject
495 .getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ex);
496 throw new MusicServiceException("Execute Get Error" + ex.getMessage());
504 * This method performs DDL operations on Cassandra using consistency level ONE.
506 * @param queryObject Object containing cassandra prepared query and values.
508 public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
509 throws MusicServiceException, MusicQueryException {
510 return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
515 * This method performs DDL operation on Cassandra using consistency level LOCAL_QUORUM.
517 * @param queryObject Object containing cassandra prepared query and values.
519 public ResultSet executeLocalQuorumConsistencyGet(PreparedQueryObject queryObject)
520 throws MusicServiceException, MusicQueryException {
521 return executeGet(queryObject, CONSISTENCY_LEVEL_LOCAL_QUORUM);
526 * This method performs DDL operation on Cassandra using consistency level QUORUM.
528 * @param queryObject Object containing cassandra prepared query and values.
530 public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
531 throws MusicServiceException, MusicQueryException {
532 return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);