2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Modifications Copyright (c) 2018-2019 IBM
8 * Modifications Copyright (c) 2019 Samsung
9 * ===================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
22 * ============LICENSE_END=============================================
23 * ====================================================================
26 package org.onap.music.datastore;
28 import java.nio.ByteBuffer;
29 import java.util.HashMap;
32 import org.onap.music.eelf.logging.EELFLoggerDelegate;
33 import org.onap.music.eelf.logging.format.AppMessages;
34 import org.onap.music.eelf.logging.format.ErrorSeverity;
35 import org.onap.music.eelf.logging.format.ErrorTypes;
36 import org.onap.music.exceptions.MusicQueryException;
37 import org.onap.music.exceptions.MusicServiceException;
38 import org.onap.music.lockingservice.cassandra.LockType;
39 import org.onap.music.main.CipherUtil;
40 import org.onap.music.main.MusicUtil;
41 import com.datastax.driver.core.Cluster;
42 import com.datastax.driver.core.ColumnDefinitions;
43 import com.datastax.driver.core.ColumnDefinitions.Definition;
44 import com.datastax.driver.core.ConsistencyLevel;
45 import com.datastax.driver.core.DataType;
46 import com.datastax.driver.core.HostDistance;
47 import com.datastax.driver.core.KeyspaceMetadata;
48 import com.datastax.driver.core.Metadata;
49 import com.datastax.driver.core.PoolingOptions;
50 import com.datastax.driver.core.ResultSet;
51 import com.datastax.driver.core.Row;
52 import com.datastax.driver.core.Session;
53 import com.datastax.driver.core.SimpleStatement;
54 import com.datastax.driver.core.TableMetadata;
55 import com.datastax.driver.core.exceptions.AlreadyExistsException;
56 import com.datastax.driver.core.exceptions.InvalidQueryException;
57 import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
63 public class MusicDataStore {
64 private Session session;
65 private Cluster cluster;
69 * Connect to default Cassandra address
71 public MusicDataStore() {
73 connectToCassaCluster(MusicUtil.getMyCassaHost());
74 } catch (MusicServiceException e) {
75 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
84 public MusicDataStore(Cluster cluster, Session session) {
85 this.session = session;
93 public void setSession(Session session) {
94 this.session = session;
100 public Session getSession() {
107 public void setCluster(Cluster cluster) {
108 EnumNameCodec<LockType> lockTypeCodec = new EnumNameCodec<LockType>(LockType.class);
109 cluster.getConfiguration().getCodecRegistry().register(lockTypeCodec);
111 this.cluster = cluster;
114 public Cluster getCluster() {
119 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
125 * @throws MusicServiceException
127 public MusicDataStore(String remoteIp) {
129 connectToCassaCluster(remoteIp);
130 } catch (MusicServiceException e) {
131 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
138 public void close() {
143 * This method connects to cassandra cluster on specific address.
147 private void connectToCassaCluster(String address) throws MusicServiceException {
148 String[] addresses = null;
149 addresses = address.split(",");
150 PoolingOptions poolingOptions = new PoolingOptions();
152 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
153 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
156 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
157 String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd());
158 logger.info(EELFLoggerDelegate.applicationLogger,
159 "Building with credentials "+MusicUtil.getCassName()+" & "+ MusicUtil.getCassPwd());
160 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
161 .withCredentials(MusicUtil.getCassName(), cassPwd)
162 //.withLoadBalancingPolicy(new RoundRobinPolicy())
163 .withoutJMXReporting()
164 .withPoolingOptions(poolingOptions)
165 .addContactPoints(addresses).build();
167 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
168 .withoutJMXReporting()
169 .withPoolingOptions(poolingOptions)
170 .addContactPoints(addresses)
174 this.setCluster(cluster);
175 Metadata metadata = this.cluster.getMetadata();
176 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
177 + metadata.getClusterName() + " at " + address);
180 session = this.cluster.connect();
181 } catch (Exception ex) {
182 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY,
183 ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE, ex);
184 throw new MusicServiceException(
185 "Error while connecting to Cassandra cluster.. " + ex.getMessage());
196 public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
197 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
198 TableMetadata table = ks.getTable(tableName);
199 return table.getColumn(columnName).getType();
207 * @return TableMetadata
209 public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
210 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
211 return ks.getTable(tableName);
218 * @return TableMetadata
220 public KeyspaceMetadata returnKeyspaceMetadata(String keyspace) {
221 return cluster.getMetadata().getKeyspace(keyspace);
226 * Utility function to return the Java specific object type.
233 public Object getColValue(Row row, String colName, DataType colType) {
235 switch (colType.getName()) {
237 return row.getString(colName);
239 return row.getUUID(colName);
241 return row.getVarint(colName);
243 return row.getLong(colName);
245 return row.getInt(colName);
247 return row.getFloat(colName);
249 return row.getDouble(colName);
251 return row.getBool(colName);
253 return row.getMap(colName, String.class, String.class);
255 return row.getList(colName, String.class);
261 public byte[] getBlobValue(Row row, String colName, DataType colType) {
262 ByteBuffer bb = row.getBytes(colName);
266 public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
267 ColumnDefinitions colInfo = row.getColumnDefinitions();
269 for (Map.Entry<String, Object> entry : condition.entrySet()) {
270 String colName = entry.getKey();
271 DataType colType = colInfo.getType(colName);
272 Object columnValue = getColValue(row, colName, colType);
273 Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
274 if (columnValue.equals(conditionValue) == false)
281 * Utility function to store ResultSet values in to a MAP for output.
286 public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
287 Map<String, HashMap<String, Object>> resultMap =
290 for (Row row : results) {
291 ColumnDefinitions colInfo = row.getColumnDefinitions();
292 HashMap<String, Object> resultOutput = new HashMap<>();
293 for (Definition definition : colInfo) {
294 if (!(("vector_ts").equals(definition.getName()))) {
295 if(definition.getType().toString().toLowerCase().contains("blob")) {
296 resultOutput.put(definition.getName(),
297 getBlobValue(row, definition.getName(), definition.getType()));
299 resultOutput.put(definition.getName(),
300 getColValue(row, definition.getName(), definition.getType()));
304 resultMap.put("row " + counter, resultOutput);
311 // Prepared Statements 1802 additions
313 public boolean executePut(PreparedQueryObject queryObject, String consistency)
314 throws MusicServiceException, MusicQueryException {
315 return executePut(queryObject, consistency, 0);
318 * This Method performs DDL and DML operations on Cassandra using specified consistency level
320 * @param queryObject Object containing cassandra prepared query and values.
321 * @param consistency Specify consistency level for data synchronization across cassandra
323 * @return Boolean Indicates operation success or failure
324 * @throws MusicServiceException
325 * @throws MusicQueryException
327 public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot)
328 throws MusicServiceException, MusicQueryException {
330 boolean result = false;
331 long timeOfWrite = System.currentTimeMillis();
332 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
333 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
334 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
335 + queryObject.getQuery() + "]");
337 logger.debug(EELFLoggerDelegate.applicationLogger,
338 "In preprared Execute Put: the actual insert query:"
339 + queryObject.getQuery() + "; the values"
340 + queryObject.getValues());
341 SimpleStatement preparedInsert = null;
344 preparedInsert = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
345 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
346 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
347 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
348 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
349 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
350 if(queryObject.getConsistency() == null)
351 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
353 preparedInsert.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
354 } else if (consistency.equalsIgnoreCase(MusicUtil.ONE)) {
355 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
356 } else if (consistency.equalsIgnoreCase(MusicUtil.QUORUM)) {
357 preparedInsert.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
358 } else if (consistency.equalsIgnoreCase(MusicUtil.ALL)) {
359 preparedInsert.setConsistencyLevel(ConsistencyLevel.ALL);
361 long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite);
362 preparedInsert.setDefaultTimestamp(timestamp);
364 ResultSet rs = session.execute(preparedInsert);
365 result = rs.wasApplied();
368 catch (AlreadyExistsException ae) {
369 // logger.error(EELFLoggerDelegate.errorLogger,"AlreadExistsException: " + ae.getMessage(),AppMessages.QUERYERROR,
370 // ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
371 throw new MusicQueryException("AlreadyExistsException: " + ae.getMessage(),ae);
372 } catch ( InvalidQueryException e ) {
373 // logger.error(EELFLoggerDelegate.errorLogger,"InvalidQueryException: " + e.getMessage(),AppMessages.SESSIONFAILED + " ["
374 // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
375 throw new MusicQueryException("InvalidQueryException: " + e.getMessage(),e);
376 } catch (Exception e) {
377 // logger.error(EELFLoggerDelegate.errorLogger,e.getClass().toString() + ":" + e.getMessage(),AppMessages.SESSIONFAILED + " ["
378 // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e);
379 throw new MusicServiceException("Executing Session Failure for Request = " + "["
380 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage(),e);
386 * This method performs DDL operations on Cassandra using consistency level ONE.
388 * @param queryObject Object containing cassandra prepared query and values.
390 * @throws MusicServiceException
391 * @throws MusicQueryException
393 public ResultSet executeEventualGet(PreparedQueryObject queryObject)
394 throws MusicServiceException, MusicQueryException {
395 CacheAccess<String, PreparedStatement> queryBank = CachingUtil.getStatementBank();
396 PreparedStatement preparedEventualGet = null;
397 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
398 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
399 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
400 + queryObject.getQuery() + "]");
402 logger.info(EELFLoggerDelegate.applicationLogger,
403 "Executing Eventual get query:" + queryObject.getQuery());
405 ResultSet results = null;
407 if(queryBank.get(queryObject.getQuery()) != null )
408 preparedEventualGet=queryBank.get(queryObject.getQuery());
410 preparedEventualGet = session.prepare(queryObject.getQuery());
411 CachingUtil.updateStatementBank(queryObject.getQuery(), preparedEventualGet);
413 if(queryObject.getConsistency() == null) {
414 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
416 preparedEventualGet.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
418 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
420 } catch (Exception ex) {
421 logger.error("Exception", ex);
422 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
423 throw new MusicServiceException(ex.getMessage());
430 * This method performs DDL operation on Cassandra using consistency level QUORUM.
432 * @param queryObject Object containing cassandra prepared query and values.
434 * @throws MusicServiceException
435 * @throws MusicQueryException
437 public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
438 throws MusicServiceException, MusicQueryException {
439 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
440 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
441 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
442 + queryObject.getQuery() + "]");
444 logger.info(EELFLoggerDelegate.applicationLogger,
445 "Executing Critical get query:" + queryObject.getQuery());
446 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
447 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
448 ResultSet results = null;
450 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
451 } catch (Exception ex) {
452 logger.error("Exception", ex);
453 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
454 throw new MusicServiceException(ex.getMessage());
460 public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException {
461 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
462 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
463 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
464 + queryObject.getQuery() + "]");
466 ResultSet results = null;
468 SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
470 if (consistencyLevel.equalsIgnoreCase(MusicUtil.ONE)) {
471 if(queryObject.getConsistency() == null) {
472 statement.setConsistencyLevel(ConsistencyLevel.ONE);
474 statement.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
476 } else if (consistencyLevel.equalsIgnoreCase(MusicUtil.QUORUM)) {
477 statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
478 } else if (consistencyLevel.equalsIgnoreCase(MusicUtil.LOCAL_QUORUM)) {
479 statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
482 results = session.execute(statement);
484 } catch (Exception ex) {
485 logger.error(EELFLoggerDelegate.errorLogger, "Execute Get Error" + ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject
486 .getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ex);
487 throw new MusicServiceException("Execute Get Error" + ex.getMessage());
495 * This method performs DDL operations on Cassandra using consistency level ONE.
497 * @param queryObject Object containing cassandra prepared query and values.
499 public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
500 throws MusicServiceException, MusicQueryException {
501 return executeGet(queryObject, MusicUtil.ONE);
506 * This method performs DDL operation on Cassandra using consistency level LOCAL_QUORUM.
508 * @param queryObject Object containing cassandra prepared query and values.
510 public ResultSet executeLocalQuorumConsistencyGet(PreparedQueryObject queryObject)
511 throws MusicServiceException, MusicQueryException {
512 return executeGet(queryObject, MusicUtil.LOCAL_QUORUM);
517 * This method performs DDL operation on Cassandra using consistency level QUORUM.
519 * @param queryObject Object containing cassandra prepared query and values.
521 public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
522 throws MusicServiceException, MusicQueryException {
523 return executeGet(queryObject, MusicUtil.QUORUM);