2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Modifications Copyright (c) 2018-2019 IBM
8 * Modifications Copyright (c) 2019 Samsung
9 * ===================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
22 * ============LICENSE_END=============================================
23 * ====================================================================
26 package org.onap.music.datastore;
28 import java.net.InetAddress;
29 import java.net.NetworkInterface;
30 import java.net.SocketException;
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.Enumeration;
34 import java.util.HashMap;
35 import java.util.Iterator;
38 import org.apache.commons.jcs.access.CacheAccess;
39 import org.onap.music.authentication.CachingUtil;
40 import org.onap.music.eelf.logging.EELFLoggerDelegate;
41 import org.onap.music.eelf.logging.format.AppMessages;
42 import org.onap.music.eelf.logging.format.ErrorSeverity;
43 import org.onap.music.eelf.logging.format.ErrorTypes;
44 import org.onap.music.exceptions.MusicQueryException;
45 import org.onap.music.exceptions.MusicServiceException;
46 import org.onap.music.main.MusicUtil;
48 import com.codahale.metrics.JmxReporter;
49 import com.datastax.driver.core.Cluster;
50 import com.datastax.driver.core.ColumnDefinitions;
51 import com.datastax.driver.core.ColumnDefinitions.Definition;
52 import com.datastax.driver.core.ConsistencyLevel;
53 import com.datastax.driver.core.DataType;
54 import com.datastax.driver.core.HostDistance;
55 import com.datastax.driver.core.KeyspaceMetadata;
56 import com.datastax.driver.core.Metadata;
57 import com.datastax.driver.core.PoolingOptions;
58 import com.datastax.driver.core.PreparedStatement;
59 import com.datastax.driver.core.ResultSet;
60 import com.datastax.driver.core.Row;
61 import com.datastax.driver.core.Session;
62 import com.datastax.driver.core.SimpleStatement;
63 import com.datastax.driver.core.TableMetadata;
64 import com.datastax.driver.core.exceptions.AlreadyExistsException;
65 import com.datastax.driver.core.exceptions.InvalidQueryException;
66 import com.datastax.driver.core.exceptions.NoHostAvailableException;
67 import com.sun.jersey.core.util.Base64;
73 public class MusicDataStore {
75 public static final String CONSISTENCY_LEVEL_ONE = "ONE";
76 public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
77 private Session session;
78 private Cluster cluster;
84 public void setSession(Session session) {
85 this.session = session;
91 public Session getSession() {
98 public void setCluster(Cluster cluster) {
99 this.cluster = cluster;
103 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
108 public MusicDataStore() {
109 connectToCassaCluster();
117 public MusicDataStore(Cluster cluster, Session session) {
118 this.session = session;
119 this.cluster = cluster;
125 * @throws MusicServiceException
127 public MusicDataStore(String remoteIp) {
129 connectToCassaCluster(remoteIp);
130 } catch (MusicServiceException e) {
131 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
139 private ArrayList<String> getAllPossibleLocalIps() {
140 ArrayList<String> allPossibleIps = new ArrayList<>();
142 Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
143 while (en.hasMoreElements()) {
144 NetworkInterface ni = en.nextElement();
145 Enumeration<InetAddress> ee = ni.getInetAddresses();
146 while (ee.hasMoreElements()) {
147 InetAddress ia = ee.nextElement();
148 allPossibleIps.add(ia.getHostAddress());
151 } catch (SocketException e) {
152 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR,
153 ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e);
154 }catch(Exception e) {
155 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes
156 .GENERALSERVICEERROR, e);
158 return allPossibleIps;
162 * This method iterates through all available IP addresses and connects to multiple cassandra
165 private void connectToCassaCluster() {
166 Iterator<String> it = getAllPossibleLocalIps().iterator();
167 String address = "localhost";
168 String[] addresses = null;
169 address = MusicUtil.getMyCassaHost();
170 addresses = address.split(",");
172 logger.info(EELFLoggerDelegate.applicationLogger,
173 "Connecting to cassa cluster: Iterating through possible ips:"
174 + getAllPossibleLocalIps());
175 PoolingOptions poolingOptions = new PoolingOptions();
177 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
178 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
179 while (it.hasNext()) {
181 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
182 logger.info(EELFLoggerDelegate.applicationLogger,
183 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
184 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
185 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
186 //.withLoadBalancingPolicy(new RoundRobinPolicy())
187 .withoutJMXReporting()
188 .withPoolingOptions(poolingOptions)
189 .addContactPoints(addresses).build();
192 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
193 //.withLoadBalancingPolicy(new RoundRobinPolicy())
194 .addContactPoints(addresses).build();
196 Metadata metadata = cluster.getMetadata();
197 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
198 + metadata.getClusterName() + " at " + address);
199 session = cluster.connect();
202 } catch (NoHostAvailableException e) {
204 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE,
205 ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e);
213 public void close() {
218 * This method connects to cassandra cluster on specific address.
222 private void connectToCassaCluster(String address) throws MusicServiceException {
223 String[] addresses = null;
224 addresses = address.split(",");
225 PoolingOptions poolingOptions = new PoolingOptions();
227 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
228 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
229 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
230 logger.info(EELFLoggerDelegate.applicationLogger,
231 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
232 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
233 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
234 //.withLoadBalancingPolicy(new RoundRobinPolicy())
235 .withoutJMXReporting()
236 .withPoolingOptions(poolingOptions)
237 .addContactPoints(addresses).build();
240 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
241 //.withLoadBalancingPolicy(new RoundRobinPolicy())
242 .withoutJMXReporting()
243 .withPoolingOptions(poolingOptions)
244 .addContactPoints(addresses).build();
247 // JmxReporter reporter =
248 // JmxReporter.forRegistry(cluster.getMetrics().getRegistry())
249 // .inDomain(cluster.getClusterName() + "-metrics")
254 Metadata metadata = cluster.getMetadata();
255 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
256 + metadata.getClusterName() + " at " + address);
258 session = cluster.connect();
259 } catch (Exception ex) {
260 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY,
261 ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE, ex);
262 throw new MusicServiceException(
263 "Error while connecting to Cassandra cluster.. " + ex.getMessage());
274 public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
275 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
276 TableMetadata table = ks.getTable(tableName);
277 return table.getColumn(columnName).getType();
285 * @return TableMetadata
287 public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
288 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
289 return ks.getTable(tableName);
294 * Utility function to return the Java specific object type.
301 public Object getColValue(Row row, String colName, DataType colType) {
303 switch (colType.getName()) {
305 return row.getString(colName);
307 return row.getUUID(colName);
309 return row.getVarint(colName);
311 return row.getLong(colName);
313 return row.getInt(colName);
315 return row.getFloat(colName);
317 return row.getDouble(colName);
319 return row.getBool(colName);
321 return row.getMap(colName, String.class, String.class);
323 return row.getList(colName, String.class);
329 public byte[] getBlobValue(Row row, String colName, DataType colType) {
330 ByteBuffer bb = row.getBytes(colName);
334 public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
335 ColumnDefinitions colInfo = row.getColumnDefinitions();
337 for (Map.Entry<String, Object> entry : condition.entrySet()) {
338 String colName = entry.getKey();
339 DataType colType = colInfo.getType(colName);
340 Object columnValue = getColValue(row, colName, colType);
341 Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
342 if (columnValue.equals(conditionValue) == false)
349 * Utility function to store ResultSet values in to a MAP for output.
354 public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
355 Map<String, HashMap<String, Object>> resultMap =
358 for (Row row : results) {
359 ColumnDefinitions colInfo = row.getColumnDefinitions();
360 HashMap<String, Object> resultOutput = new HashMap<>();
361 for (Definition definition : colInfo) {
362 if (!(("vector_ts").equals(definition.getName()))) {
363 if(definition.getType().toString().toLowerCase().contains("blob")) {
364 resultOutput.put(definition.getName(),
365 getBlobValue(row, definition.getName(), definition.getType()));
368 resultOutput.put(definition.getName(),
369 getColValue(row, definition.getName(), definition.getType()));
372 resultMap.put("row " + counter, resultOutput);
379 // Prepared Statements 1802 additions
381 public boolean executePut(PreparedQueryObject queryObject, String consistency)
382 throws MusicServiceException, MusicQueryException {
383 return executePut(queryObject, consistency, 0);
386 * This Method performs DDL and DML operations on Cassandra using specified consistency level
388 * @param queryObject Object containing cassandra prepared query and values.
389 * @param consistency Specify consistency level for data synchronization across cassandra
391 * @return Boolean Indicates operation success or failure
392 * @throws MusicServiceException
393 * @throws MusicQueryException
395 public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot)
396 throws MusicServiceException, MusicQueryException {
398 boolean result = false;
399 long timeOfWrite = System.currentTimeMillis();
400 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
401 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
402 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
403 + queryObject.getQuery() + "]");
405 logger.info(EELFLoggerDelegate.applicationLogger,
406 "In preprared Execute Put: the actual insert query:"
407 + queryObject.getQuery() + "; the values"
408 + queryObject.getValues());
410 PreparedStatement preparedInsert = null;
413 preparedInsert = session.prepare(queryObject.getQuery());
415 } catch(InvalidQueryException iqe) {
416 logger.error("Exception", iqe);
417 logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
418 throw new MusicQueryException(iqe.getMessage());
419 }catch(Exception e) {
420 logger.error("Exception", e);
421 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
422 throw new MusicQueryException(e.getMessage());
426 SimpleStatement preparedInsert = null;
429 preparedInsert = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
430 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
431 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
432 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
433 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
434 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
435 if(queryObject.getConsistency() == null)
436 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
438 preparedInsert.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
439 } else if (consistency.equalsIgnoreCase(MusicUtil.ONE)) {
440 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
441 } else if (consistency.equalsIgnoreCase(MusicUtil.QUORUM)) {
442 preparedInsert.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
443 } else if (consistency.equalsIgnoreCase(MusicUtil.ALL)) {
444 preparedInsert.setConsistencyLevel(ConsistencyLevel.ALL);
446 long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite);
447 preparedInsert.setDefaultTimestamp(timestamp);
449 ResultSet rs = session.execute(preparedInsert);
450 result = rs.wasApplied();
453 catch (AlreadyExistsException ae) {
454 logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" +
455 queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ae);
456 throw new MusicServiceException(ae.getMessage());
458 catch (Exception e) {
459 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED+ " [" +
460 queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e);
461 throw new MusicQueryException("Executing Session Failure for Request = " + "["
462 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
470 * This method performs DDL operations on Cassandra using consistency level ONE.
472 * @param queryObject Object containing cassandra prepared query and values.
474 * @throws MusicServiceException
475 * @throws MusicQueryException
477 public ResultSet executeEventualGet(PreparedQueryObject queryObject)
478 throws MusicServiceException, MusicQueryException {
479 CacheAccess<String, PreparedStatement> queryBank = CachingUtil.getStatementBank();
480 PreparedStatement preparedEventualGet = null;
481 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
482 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
483 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
484 + queryObject.getQuery() + "]");
486 logger.info(EELFLoggerDelegate.applicationLogger,
487 "Executing Eventual get query:" + queryObject.getQuery());
489 ResultSet results = null;
491 if(queryBank.get(queryObject.getQuery()) != null )
492 preparedEventualGet=queryBank.get(queryObject.getQuery());
494 preparedEventualGet = session.prepare(queryObject.getQuery());
495 CachingUtil.updateStatementBank(queryObject.getQuery(), preparedEventualGet);
497 if(queryObject.getConsistency() == null) {
498 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
501 preparedEventualGet.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
503 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
505 } catch (Exception ex) {
506 logger.error("Exception", ex);
507 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
508 throw new MusicServiceException(ex.getMessage());
515 * This method performs DDL operation on Cassandra using consistency level QUORUM.
517 * @param queryObject Object containing cassandra prepared query and values.
519 * @throws MusicServiceException
520 * @throws MusicQueryException
522 public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
523 throws MusicServiceException, MusicQueryException {
524 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
525 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
526 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
527 + queryObject.getQuery() + "]");
529 logger.info(EELFLoggerDelegate.applicationLogger,
530 "Executing Critical get query:" + queryObject.getQuery());
531 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
532 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
533 ResultSet results = null;
535 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
536 } catch (Exception ex) {
537 logger.error("Exception", ex);
538 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
539 throw new MusicServiceException(ex.getMessage());
545 public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException {
546 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
547 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
548 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
549 + queryObject.getQuery() + "]");
551 ResultSet results = null;
553 SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
555 if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) {
556 if(queryObject.getConsistency() == null) {
557 statement.setConsistencyLevel(ConsistencyLevel.ONE);
560 statement.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
563 else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
564 statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
567 results = session.execute(statement);
569 } catch (Exception ex) {
570 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject
571 .getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ex);
572 throw new MusicServiceException(ex.getMessage());
580 * This method performs DDL operations on Cassandra using consistency level ONE.
582 * @param queryObject Object containing cassandra prepared query and values.
584 public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
585 throws MusicServiceException, MusicQueryException {
586 return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
591 * This method performs DDL operation on Cassandra using consistency level QUORUM.
593 * @param queryObject Object containing cassandra prepared query and values.
595 public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
596 throws MusicServiceException, MusicQueryException {
597 return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);