2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Modifications Copyright (c) 2018-2019 IBM
8 * Modifications Copyright (c) 2019 Samsung
9 * ===================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
22 * ============LICENSE_END=============================================
23 * ====================================================================
26 package org.onap.music.datastore;
28 import java.net.InetAddress;
29 import java.net.NetworkInterface;
30 import java.net.SocketException;
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.Enumeration;
34 import java.util.HashMap;
35 import java.util.Iterator;
38 import org.onap.music.eelf.logging.EELFLoggerDelegate;
39 import org.onap.music.eelf.logging.format.AppMessages;
40 import org.onap.music.eelf.logging.format.ErrorSeverity;
41 import org.onap.music.eelf.logging.format.ErrorTypes;
42 import org.onap.music.exceptions.MusicQueryException;
43 import org.onap.music.exceptions.MusicServiceException;
44 import org.onap.music.main.CipherUtil;
45 import org.onap.music.main.MusicUtil;
46 import com.datastax.driver.core.Cluster;
47 import com.datastax.driver.core.ColumnDefinitions;
48 import com.datastax.driver.core.ColumnDefinitions.Definition;
49 import com.datastax.driver.core.ConsistencyLevel;
50 import com.datastax.driver.core.DataType;
51 import com.datastax.driver.core.HostDistance;
52 import com.datastax.driver.core.KeyspaceMetadata;
53 import com.datastax.driver.core.Metadata;
54 import com.datastax.driver.core.PoolingOptions;
55 import com.datastax.driver.core.ResultSet;
56 import com.datastax.driver.core.Row;
57 import com.datastax.driver.core.Session;
58 import com.datastax.driver.core.SimpleStatement;
59 import com.datastax.driver.core.TableMetadata;
60 import com.datastax.driver.core.exceptions.AlreadyExistsException;
61 import com.datastax.driver.core.exceptions.InvalidQueryException;
62 import com.datastax.driver.core.exceptions.NoHostAvailableException;
68 public class MusicDataStore {
70 public static final String CONSISTENCY_LEVEL_ONE = "ONE";
71 public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
72 private Session session;
73 private Cluster cluster;
79 public void setSession(Session session) {
80 this.session = session;
86 public Session getSession() {
93 public void setCluster(Cluster cluster) {
94 this.cluster = cluster;
98 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
103 public MusicDataStore() {
104 connectToCassaCluster();
112 public MusicDataStore(Cluster cluster, Session session) {
113 this.session = session;
114 this.cluster = cluster;
120 * @throws MusicServiceException
122 public MusicDataStore(String remoteIp) {
124 connectToCassaCluster(remoteIp);
125 } catch (MusicServiceException e) {
126 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
134 private ArrayList<String> getAllPossibleLocalIps() {
135 ArrayList<String> allPossibleIps = new ArrayList<>();
137 Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
138 while (en.hasMoreElements()) {
139 NetworkInterface ni = en.nextElement();
140 Enumeration<InetAddress> ee = ni.getInetAddresses();
141 while (ee.hasMoreElements()) {
142 InetAddress ia = ee.nextElement();
143 allPossibleIps.add(ia.getHostAddress());
146 } catch (SocketException e) {
147 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR,
148 ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e);
149 }catch(Exception e) {
150 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes
151 .GENERALSERVICEERROR, e);
153 return allPossibleIps;
157 * This method iterates through all available IP addresses and connects to multiple cassandra
160 private void connectToCassaCluster() {
161 Iterator<String> it = getAllPossibleLocalIps().iterator();
162 String address = "localhost";
163 String[] addresses = null;
164 address = MusicUtil.getMyCassaHost();
165 addresses = address.split(",");
167 logger.info(EELFLoggerDelegate.applicationLogger,
168 "Connecting to cassa cluster: Iterating through possible ips:"
169 + getAllPossibleLocalIps());
170 PoolingOptions poolingOptions = new PoolingOptions();
172 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
173 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
174 while (it.hasNext()) {
176 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
177 String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd());
178 logger.info(EELFLoggerDelegate.applicationLogger,
179 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
180 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
181 .withCredentials(MusicUtil.getCassName(), cassPwd)
182 //.withLoadBalancingPolicy(new RoundRobinPolicy())
183 .withoutJMXReporting()
184 .withPoolingOptions(poolingOptions)
185 .addContactPoints(addresses).build();
188 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
189 //.withLoadBalancingPolicy(new RoundRobinPolicy())
190 .addContactPoints(addresses).build();
192 Metadata metadata = cluster.getMetadata();
193 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
194 + metadata.getClusterName() + " at " + address);
195 session = cluster.connect();
198 } catch (NoHostAvailableException e) {
200 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE,
201 ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e);
209 public void close() {
214 * This method connects to cassandra cluster on specific address.
218 private void connectToCassaCluster(String address) throws MusicServiceException {
219 String[] addresses = null;
220 addresses = address.split(",");
221 PoolingOptions poolingOptions = new PoolingOptions();
223 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
224 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
225 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
226 String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd());
227 logger.info(EELFLoggerDelegate.applicationLogger,
228 "Building with credentials "+MusicUtil.getCassName()+" & "+ MusicUtil.getCassPwd());
229 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
230 .withCredentials(MusicUtil.getCassName(), cassPwd)
231 //.withLoadBalancingPolicy(new RoundRobinPolicy())
232 .withoutJMXReporting()
233 .withPoolingOptions(poolingOptions)
234 .addContactPoints(addresses).build();
236 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
237 .withoutJMXReporting()
238 .withPoolingOptions(poolingOptions)
239 .addContactPoints(addresses).build();
242 Metadata metadata = cluster.getMetadata();
243 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
244 + metadata.getClusterName() + " at " + address);
246 session = cluster.connect();
247 } catch (Exception ex) {
248 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY,
249 ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE, ex);
250 throw new MusicServiceException(
251 "Error while connecting to Cassandra cluster.. " + ex.getMessage());
262 public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
263 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
264 TableMetadata table = ks.getTable(tableName);
265 return table.getColumn(columnName).getType();
273 * @return TableMetadata
275 public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
276 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
277 return ks.getTable(tableName);
284 * @return TableMetadata
286 public KeyspaceMetadata returnKeyspaceMetadata(String keyspace) {
287 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
293 * Utility function to return the Java specific object type.
300 public Object getColValue(Row row, String colName, DataType colType) {
302 switch (colType.getName()) {
304 return row.getString(colName);
306 return row.getUUID(colName);
308 return row.getVarint(colName);
310 return row.getLong(colName);
312 return row.getInt(colName);
314 return row.getFloat(colName);
316 return row.getDouble(colName);
318 return row.getBool(colName);
320 return row.getMap(colName, String.class, String.class);
322 return row.getList(colName, String.class);
328 public byte[] getBlobValue(Row row, String colName, DataType colType) {
329 ByteBuffer bb = row.getBytes(colName);
333 public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
334 ColumnDefinitions colInfo = row.getColumnDefinitions();
336 for (Map.Entry<String, Object> entry : condition.entrySet()) {
337 String colName = entry.getKey();
338 DataType colType = colInfo.getType(colName);
339 Object columnValue = getColValue(row, colName, colType);
340 Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
341 if (columnValue.equals(conditionValue) == false)
348 * Utility function to store ResultSet values in to a MAP for output.
353 public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
354 Map<String, HashMap<String, Object>> resultMap =
357 for (Row row : results) {
358 ColumnDefinitions colInfo = row.getColumnDefinitions();
359 HashMap<String, Object> resultOutput = new HashMap<>();
360 for (Definition definition : colInfo) {
361 if (!(("vector_ts").equals(definition.getName()))) {
362 if(definition.getType().toString().toLowerCase().contains("blob")) {
363 resultOutput.put(definition.getName(),
364 getBlobValue(row, definition.getName(), definition.getType()));
366 resultOutput.put(definition.getName(),
367 getColValue(row, definition.getName(), definition.getType()));
371 resultMap.put("row " + counter, resultOutput);
378 // Prepared Statements 1802 additions
380 public boolean executePut(PreparedQueryObject queryObject, String consistency)
381 throws MusicServiceException, MusicQueryException {
382 return executePut(queryObject, consistency, 0);
385 * This Method performs DDL and DML operations on Cassandra using specified consistency level
387 * @param queryObject Object containing cassandra prepared query and values.
388 * @param consistency Specify consistency level for data synchronization across cassandra
390 * @return Boolean Indicates operation success or failure
391 * @throws MusicServiceException
392 * @throws MusicQueryException
394 public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot)
395 throws MusicServiceException, MusicQueryException {
397 boolean result = false;
398 long timeOfWrite = System.currentTimeMillis();
399 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
400 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
401 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
402 + queryObject.getQuery() + "]");
404 logger.debug(EELFLoggerDelegate.applicationLogger,
405 "In preprared Execute Put: the actual insert query:"
406 + queryObject.getQuery() + "; the values"
407 + queryObject.getValues());
408 SimpleStatement preparedInsert = null;
411 preparedInsert = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
412 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
413 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
414 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
415 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
416 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
417 if(queryObject.getConsistency() == null)
418 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
420 preparedInsert.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
421 } else if (consistency.equalsIgnoreCase(MusicUtil.ONE)) {
422 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
423 } else if (consistency.equalsIgnoreCase(MusicUtil.QUORUM)) {
424 preparedInsert.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
425 } else if (consistency.equalsIgnoreCase(MusicUtil.ALL)) {
426 preparedInsert.setConsistencyLevel(ConsistencyLevel.ALL);
428 long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite);
429 preparedInsert.setDefaultTimestamp(timestamp);
431 ResultSet rs = session.execute(preparedInsert);
432 result = rs.wasApplied();
435 catch (AlreadyExistsException ae) {
436 // logger.error(EELFLoggerDelegate.errorLogger,"AlreadExistsException: " + ae.getMessage(),AppMessages.QUERYERROR,
437 // ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
438 throw new MusicQueryException("AlreadyExistsException: " + ae.getMessage(),ae);
439 } catch ( InvalidQueryException e ) {
440 // logger.error(EELFLoggerDelegate.errorLogger,"InvalidQueryException: " + e.getMessage(),AppMessages.SESSIONFAILED + " ["
441 // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
442 throw new MusicQueryException("InvalidQueryException: " + e.getMessage(),e);
443 } catch (Exception e) {
444 // logger.error(EELFLoggerDelegate.errorLogger,e.getClass().toString() + ":" + e.getMessage(),AppMessages.SESSIONFAILED + " ["
445 // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e);
446 throw new MusicServiceException("Executing Session Failure for Request = " + "["
447 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage(),e);
453 * This method performs DDL operations on Cassandra using consistency level ONE.
455 * @param queryObject Object containing cassandra prepared query and values.
457 * @throws MusicServiceException
458 * @throws MusicQueryException
460 public ResultSet executeEventualGet(PreparedQueryObject queryObject)
461 throws MusicServiceException, MusicQueryException {
462 CacheAccess<String, PreparedStatement> queryBank = CachingUtil.getStatementBank();
463 PreparedStatement preparedEventualGet = null;
464 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
465 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
466 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
467 + queryObject.getQuery() + "]");
469 logger.info(EELFLoggerDelegate.applicationLogger,
470 "Executing Eventual get query:" + queryObject.getQuery());
472 ResultSet results = null;
474 if(queryBank.get(queryObject.getQuery()) != null )
475 preparedEventualGet=queryBank.get(queryObject.getQuery());
477 preparedEventualGet = session.prepare(queryObject.getQuery());
478 CachingUtil.updateStatementBank(queryObject.getQuery(), preparedEventualGet);
480 if(queryObject.getConsistency() == null) {
481 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
483 preparedEventualGet.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
485 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
487 } catch (Exception ex) {
488 logger.error("Exception", ex);
489 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
490 throw new MusicServiceException(ex.getMessage());
497 * This method performs DDL operation on Cassandra using consistency level QUORUM.
499 * @param queryObject Object containing cassandra prepared query and values.
501 * @throws MusicServiceException
502 * @throws MusicQueryException
504 public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
505 throws MusicServiceException, MusicQueryException {
506 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
507 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
508 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
509 + queryObject.getQuery() + "]");
511 logger.info(EELFLoggerDelegate.applicationLogger,
512 "Executing Critical get query:" + queryObject.getQuery());
513 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
514 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
515 ResultSet results = null;
517 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
518 } catch (Exception ex) {
519 logger.error("Exception", ex);
520 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
521 throw new MusicServiceException(ex.getMessage());
527 public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException {
528 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
529 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
530 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
531 + queryObject.getQuery() + "]");
533 ResultSet results = null;
535 SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
537 if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) {
538 if(queryObject.getConsistency() == null) {
539 statement.setConsistencyLevel(ConsistencyLevel.ONE);
541 statement.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
544 else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
545 statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
548 results = session.execute(statement);
550 } catch (Exception ex) {
551 logger.error(EELFLoggerDelegate.errorLogger, "Execute Get Error" + ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject
552 .getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ex);
553 throw new MusicServiceException("Execute Get Error" + ex.getMessage());
561 * This method performs DDL operations on Cassandra using consistency level ONE.
563 * @param queryObject Object containing cassandra prepared query and values.
565 public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
566 throws MusicServiceException, MusicQueryException {
567 return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
572 * This method performs DDL operation on Cassandra using consistency level QUORUM.
574 * @param queryObject Object containing cassandra prepared query and values.
576 public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
577 throws MusicServiceException, MusicQueryException {
578 return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);