2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Modifications Copyright (c) 2018-2019 IBM
8 * Modifications Copyright (c) 2019 Samsung
9 * ===================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
22 * ============LICENSE_END=============================================
23 * ====================================================================
26 package org.onap.music.datastore;
28 import java.net.InetAddress;
29 import java.net.NetworkInterface;
30 import java.net.SocketException;
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.Enumeration;
34 import java.util.HashMap;
35 import java.util.Iterator;
38 import org.onap.music.eelf.logging.EELFLoggerDelegate;
39 import org.onap.music.eelf.logging.format.AppMessages;
40 import org.onap.music.eelf.logging.format.ErrorSeverity;
41 import org.onap.music.eelf.logging.format.ErrorTypes;
42 import org.onap.music.exceptions.MusicQueryException;
43 import org.onap.music.exceptions.MusicServiceException;
44 import org.onap.music.lockingservice.cassandra.LockType;
45 import org.onap.music.main.CipherUtil;
46 import org.onap.music.main.MusicUtil;
47 import com.datastax.driver.core.Cluster;
48 import com.datastax.driver.core.ColumnDefinitions;
49 import com.datastax.driver.core.ColumnDefinitions.Definition;
50 import com.datastax.driver.core.ConsistencyLevel;
51 import com.datastax.driver.core.DataType;
52 import com.datastax.driver.core.HostDistance;
53 import com.datastax.driver.core.KeyspaceMetadata;
54 import com.datastax.driver.core.Metadata;
55 import com.datastax.driver.core.PoolingOptions;
56 import com.datastax.driver.core.ResultSet;
57 import com.datastax.driver.core.Row;
58 import com.datastax.driver.core.Session;
59 import com.datastax.driver.core.SimpleStatement;
60 import com.datastax.driver.core.TableMetadata;
61 import com.datastax.driver.core.TypeCodec;
62 import com.datastax.driver.core.exceptions.AlreadyExistsException;
63 import com.datastax.driver.core.exceptions.InvalidQueryException;
64 import com.datastax.driver.core.exceptions.NoHostAvailableException;
65 import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
66 import com.datastax.driver.extras.codecs.enums.EnumOrdinalCodec;
72 public class MusicDataStore {
74 public static final String CONSISTENCY_LEVEL_ONE = "ONE";
75 public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
76 private Session session;
77 private Cluster cluster;
81 * Connect to default Cassandra address
83 public MusicDataStore() {
85 connectToCassaCluster(MusicUtil.getMyCassaHost());
86 } catch (MusicServiceException e) {
87 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
96 public MusicDataStore(Cluster cluster, Session session) {
97 this.session = session;
105 public void setSession(Session session) {
106 this.session = session;
112 public Session getSession() {
119 public void setCluster(Cluster cluster) {
120 EnumNameCodec<LockType> lockTypeCodec = new EnumNameCodec<LockType>(LockType.class);
121 cluster.getConfiguration().getCodecRegistry().register(lockTypeCodec);
123 this.cluster = cluster;
126 public Cluster getCluster() {
131 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
137 * @throws MusicServiceException
139 public MusicDataStore(String remoteIp) {
141 connectToCassaCluster(remoteIp);
142 } catch (MusicServiceException e) {
143 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
150 public void close() {
155 * This method connects to cassandra cluster on specific address.
159 private void connectToCassaCluster(String address) throws MusicServiceException {
160 String[] addresses = null;
161 addresses = address.split(",");
162 PoolingOptions poolingOptions = new PoolingOptions();
164 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
165 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
168 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
169 String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd());
170 logger.info(EELFLoggerDelegate.applicationLogger,
171 "Building with credentials "+MusicUtil.getCassName()+" & "+ MusicUtil.getCassPwd());
172 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
173 .withCredentials(MusicUtil.getCassName(), cassPwd)
174 //.withLoadBalancingPolicy(new RoundRobinPolicy())
175 .withoutJMXReporting()
176 .withPoolingOptions(poolingOptions)
177 .addContactPoints(addresses).build();
179 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
180 .withoutJMXReporting()
181 .withPoolingOptions(poolingOptions)
182 .addContactPoints(addresses)
186 this.setCluster(cluster);
187 Metadata metadata = this.cluster.getMetadata();
188 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
189 + metadata.getClusterName() + " at " + address);
192 session = this.cluster.connect();
193 } catch (Exception ex) {
194 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY,
195 ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE, ex);
196 throw new MusicServiceException(
197 "Error while connecting to Cassandra cluster.. " + ex.getMessage());
208 public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
209 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
210 TableMetadata table = ks.getTable(tableName);
211 return table.getColumn(columnName).getType();
219 * @return TableMetadata
221 public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
222 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
223 return ks.getTable(tableName);
230 * @return TableMetadata
232 public KeyspaceMetadata returnKeyspaceMetadata(String keyspace) {
233 return cluster.getMetadata().getKeyspace(keyspace);
238 * Utility function to return the Java specific object type.
245 public Object getColValue(Row row, String colName, DataType colType) {
247 switch (colType.getName()) {
249 return row.getString(colName);
251 return row.getUUID(colName);
253 return row.getVarint(colName);
255 return row.getLong(colName);
257 return row.getInt(colName);
259 return row.getFloat(colName);
261 return row.getDouble(colName);
263 return row.getBool(colName);
265 return row.getMap(colName, String.class, String.class);
267 return row.getList(colName, String.class);
273 public byte[] getBlobValue(Row row, String colName, DataType colType) {
274 ByteBuffer bb = row.getBytes(colName);
278 public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
279 ColumnDefinitions colInfo = row.getColumnDefinitions();
281 for (Map.Entry<String, Object> entry : condition.entrySet()) {
282 String colName = entry.getKey();
283 DataType colType = colInfo.getType(colName);
284 Object columnValue = getColValue(row, colName, colType);
285 Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
286 if (columnValue.equals(conditionValue) == false)
293 * Utility function to store ResultSet values in to a MAP for output.
298 public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
299 Map<String, HashMap<String, Object>> resultMap =
302 for (Row row : results) {
303 ColumnDefinitions colInfo = row.getColumnDefinitions();
304 HashMap<String, Object> resultOutput = new HashMap<>();
305 for (Definition definition : colInfo) {
306 if (!(("vector_ts").equals(definition.getName()))) {
307 if(definition.getType().toString().toLowerCase().contains("blob")) {
308 resultOutput.put(definition.getName(),
309 getBlobValue(row, definition.getName(), definition.getType()));
311 resultOutput.put(definition.getName(),
312 getColValue(row, definition.getName(), definition.getType()));
316 resultMap.put("row " + counter, resultOutput);
323 // Prepared Statements 1802 additions
325 public boolean executePut(PreparedQueryObject queryObject, String consistency)
326 throws MusicServiceException, MusicQueryException {
327 return executePut(queryObject, consistency, 0);
330 * This Method performs DDL and DML operations on Cassandra using specified consistency level
332 * @param queryObject Object containing cassandra prepared query and values.
333 * @param consistency Specify consistency level for data synchronization across cassandra
335 * @return Boolean Indicates operation success or failure
336 * @throws MusicServiceException
337 * @throws MusicQueryException
339 public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot)
340 throws MusicServiceException, MusicQueryException {
342 boolean result = false;
343 long timeOfWrite = System.currentTimeMillis();
344 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
345 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
346 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
347 + queryObject.getQuery() + "]");
349 logger.debug(EELFLoggerDelegate.applicationLogger,
350 "In preprared Execute Put: the actual insert query:"
351 + queryObject.getQuery() + "; the values"
352 + queryObject.getValues());
353 SimpleStatement preparedInsert = null;
356 preparedInsert = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
357 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
358 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
359 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
360 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
361 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
362 if(queryObject.getConsistency() == null)
363 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
365 preparedInsert.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
366 } else if (consistency.equalsIgnoreCase(MusicUtil.ONE)) {
367 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
368 } else if (consistency.equalsIgnoreCase(MusicUtil.QUORUM)) {
369 preparedInsert.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
370 } else if (consistency.equalsIgnoreCase(MusicUtil.ALL)) {
371 preparedInsert.setConsistencyLevel(ConsistencyLevel.ALL);
373 long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite);
374 preparedInsert.setDefaultTimestamp(timestamp);
376 ResultSet rs = session.execute(preparedInsert);
377 result = rs.wasApplied();
380 catch (AlreadyExistsException ae) {
381 // logger.error(EELFLoggerDelegate.errorLogger,"AlreadExistsException: " + ae.getMessage(),AppMessages.QUERYERROR,
382 // ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
383 throw new MusicQueryException("AlreadyExistsException: " + ae.getMessage(),ae);
384 } catch ( InvalidQueryException e ) {
385 // logger.error(EELFLoggerDelegate.errorLogger,"InvalidQueryException: " + e.getMessage(),AppMessages.SESSIONFAILED + " ["
386 // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
387 throw new MusicQueryException("InvalidQueryException: " + e.getMessage(),e);
388 } catch (Exception e) {
389 // logger.error(EELFLoggerDelegate.errorLogger,e.getClass().toString() + ":" + e.getMessage(),AppMessages.SESSIONFAILED + " ["
390 // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e);
391 throw new MusicServiceException("Executing Session Failure for Request = " + "["
392 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage(),e);
398 * This method performs DDL operations on Cassandra using consistency level ONE.
400 * @param queryObject Object containing cassandra prepared query and values.
402 * @throws MusicServiceException
403 * @throws MusicQueryException
405 public ResultSet executeEventualGet(PreparedQueryObject queryObject)
406 throws MusicServiceException, MusicQueryException {
407 CacheAccess<String, PreparedStatement> queryBank = CachingUtil.getStatementBank();
408 PreparedStatement preparedEventualGet = null;
409 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
410 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
411 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
412 + queryObject.getQuery() + "]");
414 logger.info(EELFLoggerDelegate.applicationLogger,
415 "Executing Eventual get query:" + queryObject.getQuery());
417 ResultSet results = null;
419 if(queryBank.get(queryObject.getQuery()) != null )
420 preparedEventualGet=queryBank.get(queryObject.getQuery());
422 preparedEventualGet = session.prepare(queryObject.getQuery());
423 CachingUtil.updateStatementBank(queryObject.getQuery(), preparedEventualGet);
425 if(queryObject.getConsistency() == null) {
426 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
428 preparedEventualGet.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
430 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
432 } catch (Exception ex) {
433 logger.error("Exception", ex);
434 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
435 throw new MusicServiceException(ex.getMessage());
442 * This method performs DDL operation on Cassandra using consistency level QUORUM.
444 * @param queryObject Object containing cassandra prepared query and values.
446 * @throws MusicServiceException
447 * @throws MusicQueryException
449 public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
450 throws MusicServiceException, MusicQueryException {
451 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
452 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
453 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
454 + queryObject.getQuery() + "]");
456 logger.info(EELFLoggerDelegate.applicationLogger,
457 "Executing Critical get query:" + queryObject.getQuery());
458 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
459 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
460 ResultSet results = null;
462 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
463 } catch (Exception ex) {
464 logger.error("Exception", ex);
465 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
466 throw new MusicServiceException(ex.getMessage());
472 public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException {
473 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
474 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
475 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
476 + queryObject.getQuery() + "]");
478 ResultSet results = null;
480 SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
482 if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) {
483 if(queryObject.getConsistency() == null) {
484 statement.setConsistencyLevel(ConsistencyLevel.ONE);
486 statement.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
489 else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
490 statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
493 results = session.execute(statement);
495 } catch (Exception ex) {
496 logger.error(EELFLoggerDelegate.errorLogger, "Execute Get Error" + ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject
497 .getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ex);
498 throw new MusicServiceException("Execute Get Error" + ex.getMessage());
506 * This method performs DDL operations on Cassandra using consistency level ONE.
508 * @param queryObject Object containing cassandra prepared query and values.
510 public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
511 throws MusicServiceException, MusicQueryException {
512 return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
517 * This method performs DDL operation on Cassandra using consistency level QUORUM.
519 * @param queryObject Object containing cassandra prepared query and values.
521 public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
522 throws MusicServiceException, MusicQueryException {
523 return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);