2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Modifications Copyright (c) 2018-2019 IBM
8 * Modifications Copyright (c) 2019 Samsung
9 * ===================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
22 * ============LICENSE_END=============================================
23 * ====================================================================
26 package org.onap.music.datastore;
28 import java.net.InetAddress;
29 import java.net.NetworkInterface;
30 import java.net.SocketException;
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.Enumeration;
34 import java.util.HashMap;
35 import java.util.Iterator;
38 import org.apache.commons.jcs.access.CacheAccess;
39 import org.onap.music.eelf.logging.EELFLoggerDelegate;
40 import org.onap.music.eelf.logging.format.AppMessages;
41 import org.onap.music.eelf.logging.format.ErrorSeverity;
42 import org.onap.music.eelf.logging.format.ErrorTypes;
43 import org.onap.music.exceptions.MusicQueryException;
44 import org.onap.music.exceptions.MusicServiceException;
45 import org.onap.music.main.MusicUtil;
46 import com.codahale.metrics.JmxReporter;
47 import com.datastax.driver.core.Cluster;
48 import com.datastax.driver.core.ColumnDefinitions;
49 import com.datastax.driver.core.ColumnDefinitions.Definition;
50 import com.datastax.driver.core.ConsistencyLevel;
51 import com.datastax.driver.core.DataType;
52 import com.datastax.driver.core.HostDistance;
53 import com.datastax.driver.core.KeyspaceMetadata;
54 import com.datastax.driver.core.Metadata;
55 import com.datastax.driver.core.PoolingOptions;
56 import com.datastax.driver.core.PreparedStatement;
57 import com.datastax.driver.core.ResultSet;
58 import com.datastax.driver.core.Row;
59 import com.datastax.driver.core.Session;
60 import com.datastax.driver.core.SimpleStatement;
61 import com.datastax.driver.core.TableMetadata;
62 import com.datastax.driver.core.exceptions.AlreadyExistsException;
63 import com.datastax.driver.core.exceptions.InvalidQueryException;
64 import com.datastax.driver.core.exceptions.NoHostAvailableException;
65 import com.sun.jersey.core.util.Base64;
71 public class MusicDataStore {
73 public static final String CONSISTENCY_LEVEL_ONE = "ONE";
74 public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
75 private Session session;
76 private Cluster cluster;
82 public void setSession(Session session) {
83 this.session = session;
89 public Session getSession() {
96 public void setCluster(Cluster cluster) {
97 this.cluster = cluster;
101 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
106 public MusicDataStore() {
107 connectToCassaCluster();
115 public MusicDataStore(Cluster cluster, Session session) {
116 this.session = session;
117 this.cluster = cluster;
123 * @throws MusicServiceException
125 public MusicDataStore(String remoteIp) {
127 connectToCassaCluster(remoteIp);
128 } catch (MusicServiceException e) {
129 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
137 private ArrayList<String> getAllPossibleLocalIps() {
138 ArrayList<String> allPossibleIps = new ArrayList<>();
140 Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
141 while (en.hasMoreElements()) {
142 NetworkInterface ni = en.nextElement();
143 Enumeration<InetAddress> ee = ni.getInetAddresses();
144 while (ee.hasMoreElements()) {
145 InetAddress ia = ee.nextElement();
146 allPossibleIps.add(ia.getHostAddress());
149 } catch (SocketException e) {
150 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR,
151 ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e);
152 }catch(Exception e) {
153 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes
154 .GENERALSERVICEERROR, e);
156 return allPossibleIps;
160 * This method iterates through all available IP addresses and connects to multiple cassandra
163 private void connectToCassaCluster() {
164 Iterator<String> it = getAllPossibleLocalIps().iterator();
165 String address = "localhost";
166 String[] addresses = null;
167 address = MusicUtil.getMyCassaHost();
168 addresses = address.split(",");
170 logger.info(EELFLoggerDelegate.applicationLogger,
171 "Connecting to cassa cluster: Iterating through possible ips:"
172 + getAllPossibleLocalIps());
173 PoolingOptions poolingOptions = new PoolingOptions();
175 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
176 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
177 while (it.hasNext()) {
179 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
180 logger.info(EELFLoggerDelegate.applicationLogger,
181 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
182 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
183 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
184 //.withLoadBalancingPolicy(new RoundRobinPolicy())
185 .withoutJMXReporting()
186 .withPoolingOptions(poolingOptions)
187 .addContactPoints(addresses).build();
190 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
191 //.withLoadBalancingPolicy(new RoundRobinPolicy())
192 .addContactPoints(addresses).build();
194 Metadata metadata = cluster.getMetadata();
195 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
196 + metadata.getClusterName() + " at " + address);
197 session = cluster.connect();
200 } catch (NoHostAvailableException e) {
202 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE,
203 ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e);
211 public void close() {
216 * This method connects to cassandra cluster on specific address.
220 private void connectToCassaCluster(String address) throws MusicServiceException {
221 String[] addresses = null;
222 addresses = address.split(",");
223 PoolingOptions poolingOptions = new PoolingOptions();
225 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
226 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
227 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
228 logger.info(EELFLoggerDelegate.applicationLogger,
229 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
230 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
231 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
232 //.withLoadBalancingPolicy(new RoundRobinPolicy())
233 .withoutJMXReporting()
234 .withPoolingOptions(poolingOptions)
235 .addContactPoints(addresses).build();
237 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
238 //.withLoadBalancingPolicy(new RoundRobinPolicy())
239 .withoutJMXReporting()
240 .withPoolingOptions(poolingOptions)
241 .addContactPoints(addresses).build();
244 // JmxReporter reporter =
245 // JmxReporter.forRegistry(cluster.getMetrics().getRegistry())
246 // .inDomain(cluster.getClusterName() + "-metrics")
251 Metadata metadata = cluster.getMetadata();
252 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
253 + metadata.getClusterName() + " at " + address);
255 session = cluster.connect();
256 } catch (Exception ex) {
257 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY,
258 ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE, ex);
259 throw new MusicServiceException(
260 "Error while connecting to Cassandra cluster.. " + ex.getMessage());
271 public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
272 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
273 TableMetadata table = ks.getTable(tableName);
274 return table.getColumn(columnName).getType();
282 * @return TableMetadata
284 public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
285 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
286 return ks.getTable(tableName);
293 * @return TableMetadata
295 public KeyspaceMetadata returnKeyspaceMetadata(String keyspace) {
296 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
302 * Utility function to return the Java specific object type.
309 public Object getColValue(Row row, String colName, DataType colType) {
311 switch (colType.getName()) {
313 return row.getString(colName);
315 return row.getUUID(colName);
317 return row.getVarint(colName);
319 return row.getLong(colName);
321 return row.getInt(colName);
323 return row.getFloat(colName);
325 return row.getDouble(colName);
327 return row.getBool(colName);
329 return row.getMap(colName, String.class, String.class);
331 return row.getList(colName, String.class);
337 public byte[] getBlobValue(Row row, String colName, DataType colType) {
338 ByteBuffer bb = row.getBytes(colName);
342 public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
343 ColumnDefinitions colInfo = row.getColumnDefinitions();
345 for (Map.Entry<String, Object> entry : condition.entrySet()) {
346 String colName = entry.getKey();
347 DataType colType = colInfo.getType(colName);
348 Object columnValue = getColValue(row, colName, colType);
349 Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
350 if (columnValue.equals(conditionValue) == false)
357 * Utility function to store ResultSet values in to a MAP for output.
362 public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
363 Map<String, HashMap<String, Object>> resultMap =
366 for (Row row : results) {
367 ColumnDefinitions colInfo = row.getColumnDefinitions();
368 HashMap<String, Object> resultOutput = new HashMap<>();
369 for (Definition definition : colInfo) {
370 if (!(("vector_ts").equals(definition.getName()))) {
371 if(definition.getType().toString().toLowerCase().contains("blob")) {
372 resultOutput.put(definition.getName(),
373 getBlobValue(row, definition.getName(), definition.getType()));
375 resultOutput.put(definition.getName(),
376 getColValue(row, definition.getName(), definition.getType()));
380 resultMap.put("row " + counter, resultOutput);
387 // Prepared Statements 1802 additions
389 public boolean executePut(PreparedQueryObject queryObject, String consistency)
390 throws MusicServiceException, MusicQueryException {
391 return executePut(queryObject, consistency, 0);
394 * This Method performs DDL and DML operations on Cassandra using specified consistency level
396 * @param queryObject Object containing cassandra prepared query and values.
397 * @param consistency Specify consistency level for data synchronization across cassandra
399 * @return Boolean Indicates operation success or failure
400 * @throws MusicServiceException
401 * @throws MusicQueryException
403 public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot)
404 throws MusicServiceException, MusicQueryException {
406 boolean result = false;
407 long timeOfWrite = System.currentTimeMillis();
408 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
409 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
410 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
411 + queryObject.getQuery() + "]");
413 logger.info(EELFLoggerDelegate.applicationLogger,
414 "In preprared Execute Put: the actual insert query:"
415 + queryObject.getQuery() + "; the values"
416 + queryObject.getValues());
417 SimpleStatement preparedInsert = null;
420 preparedInsert = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
421 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
422 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
423 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
424 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
425 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
426 if(queryObject.getConsistency() == null)
427 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
429 preparedInsert.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
430 } else if (consistency.equalsIgnoreCase(MusicUtil.ONE)) {
431 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
432 } else if (consistency.equalsIgnoreCase(MusicUtil.QUORUM)) {
433 preparedInsert.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
434 } else if (consistency.equalsIgnoreCase(MusicUtil.ALL)) {
435 preparedInsert.setConsistencyLevel(ConsistencyLevel.ALL);
437 long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite);
438 preparedInsert.setDefaultTimestamp(timestamp);
440 ResultSet rs = session.execute(preparedInsert);
441 result = rs.wasApplied();
444 catch (AlreadyExistsException ae) {
445 logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" +
446 queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ae);
447 throw new MusicServiceException(ae.getMessage());
449 catch (Exception e) {
450 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED + " ["
451 + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e);
452 throw new MusicServiceException("Executing Session Failure for Request = " + "["
453 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
461 * This method performs DDL operations on Cassandra using consistency level ONE.
463 * @param queryObject Object containing cassandra prepared query and values.
465 * @throws MusicServiceException
466 * @throws MusicQueryException
468 public ResultSet executeEventualGet(PreparedQueryObject queryObject)
469 throws MusicServiceException, MusicQueryException {
470 CacheAccess<String, PreparedStatement> queryBank = CachingUtil.getStatementBank();
471 PreparedStatement preparedEventualGet = null;
472 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
473 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
474 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
475 + queryObject.getQuery() + "]");
477 logger.info(EELFLoggerDelegate.applicationLogger,
478 "Executing Eventual get query:" + queryObject.getQuery());
480 ResultSet results = null;
482 if(queryBank.get(queryObject.getQuery()) != null )
483 preparedEventualGet=queryBank.get(queryObject.getQuery());
485 preparedEventualGet = session.prepare(queryObject.getQuery());
486 CachingUtil.updateStatementBank(queryObject.getQuery(), preparedEventualGet);
488 if(queryObject.getConsistency() == null) {
489 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
491 preparedEventualGet.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
493 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
495 } catch (Exception ex) {
496 logger.error("Exception", ex);
497 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
498 throw new MusicServiceException(ex.getMessage());
505 * This method performs DDL operation on Cassandra using consistency level QUORUM.
507 * @param queryObject Object containing cassandra prepared query and values.
509 * @throws MusicServiceException
510 * @throws MusicQueryException
512 public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
513 throws MusicServiceException, MusicQueryException {
514 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
515 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
516 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
517 + queryObject.getQuery() + "]");
519 logger.info(EELFLoggerDelegate.applicationLogger,
520 "Executing Critical get query:" + queryObject.getQuery());
521 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
522 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
523 ResultSet results = null;
525 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
526 } catch (Exception ex) {
527 logger.error("Exception", ex);
528 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
529 throw new MusicServiceException(ex.getMessage());
535 public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException {
536 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
537 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
538 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
539 + queryObject.getQuery() + "]");
541 ResultSet results = null;
543 SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
545 if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) {
546 if(queryObject.getConsistency() == null) {
547 statement.setConsistencyLevel(ConsistencyLevel.ONE);
549 statement.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
552 else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
553 statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
556 results = session.execute(statement);
558 } catch (Exception ex) {
559 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject
560 .getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ex);
561 throw new MusicServiceException(ex.getMessage());
569 * This method performs DDL operations on Cassandra using consistency level ONE.
571 * @param queryObject Object containing cassandra prepared query and values.
573 public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
574 throws MusicServiceException, MusicQueryException {
575 return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
580 * This method performs DDL operation on Cassandra using consistency level QUORUM.
582 * @param queryObject Object containing cassandra prepared query and values.
584 public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
585 throws MusicServiceException, MusicQueryException {
586 return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);