2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Modifications Copyright (c) 2018-2019 IBM
8 * Modifications Copyright (c) 2019 Samsung
9 * ===================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
22 * ============LICENSE_END=============================================
23 * ====================================================================
26 package org.onap.music.datastore;
28 import java.net.InetAddress;
29 import java.net.NetworkInterface;
30 import java.net.SocketException;
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.Enumeration;
34 import java.util.HashMap;
35 import java.util.Iterator;
38 import org.apache.commons.jcs.access.CacheAccess;
39 import org.onap.music.authentication.CachingUtil;
40 import org.onap.music.eelf.logging.EELFLoggerDelegate;
41 import org.onap.music.eelf.logging.format.AppMessages;
42 import org.onap.music.eelf.logging.format.ErrorSeverity;
43 import org.onap.music.eelf.logging.format.ErrorTypes;
44 import org.onap.music.exceptions.MusicQueryException;
45 import org.onap.music.exceptions.MusicServiceException;
46 import org.onap.music.main.MusicUtil;
47 import com.codahale.metrics.JmxReporter;
48 import com.datastax.driver.core.Cluster;
49 import com.datastax.driver.core.ColumnDefinitions;
50 import com.datastax.driver.core.ColumnDefinitions.Definition;
51 import com.datastax.driver.core.ConsistencyLevel;
52 import com.datastax.driver.core.DataType;
53 import com.datastax.driver.core.HostDistance;
54 import com.datastax.driver.core.KeyspaceMetadata;
55 import com.datastax.driver.core.Metadata;
56 import com.datastax.driver.core.PoolingOptions;
57 import com.datastax.driver.core.PreparedStatement;
58 import com.datastax.driver.core.ResultSet;
59 import com.datastax.driver.core.Row;
60 import com.datastax.driver.core.Session;
61 import com.datastax.driver.core.SimpleStatement;
62 import com.datastax.driver.core.TableMetadata;
63 import com.datastax.driver.core.exceptions.AlreadyExistsException;
64 import com.datastax.driver.core.exceptions.InvalidQueryException;
65 import com.datastax.driver.core.exceptions.NoHostAvailableException;
66 import com.sun.jersey.core.util.Base64;
72 public class MusicDataStore {
74 public static final String CONSISTENCY_LEVEL_ONE = "ONE";
75 public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
76 private Session session;
77 private Cluster cluster;
83 public void setSession(Session session) {
84 this.session = session;
90 public Session getSession() {
97 public void setCluster(Cluster cluster) {
98 this.cluster = cluster;
102 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
107 public MusicDataStore() {
108 connectToCassaCluster();
116 public MusicDataStore(Cluster cluster, Session session) {
117 this.session = session;
118 this.cluster = cluster;
124 * @throws MusicServiceException
126 public MusicDataStore(String remoteIp) {
128 connectToCassaCluster(remoteIp);
129 } catch (MusicServiceException e) {
130 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
138 private ArrayList<String> getAllPossibleLocalIps() {
139 ArrayList<String> allPossibleIps = new ArrayList<>();
141 Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
142 while (en.hasMoreElements()) {
143 NetworkInterface ni = en.nextElement();
144 Enumeration<InetAddress> ee = ni.getInetAddresses();
145 while (ee.hasMoreElements()) {
146 InetAddress ia = ee.nextElement();
147 allPossibleIps.add(ia.getHostAddress());
150 } catch (SocketException e) {
151 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR,
152 ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e);
153 }catch(Exception e) {
154 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes
155 .GENERALSERVICEERROR, e);
157 return allPossibleIps;
161 * This method iterates through all available IP addresses and connects to multiple cassandra
164 private void connectToCassaCluster() {
165 Iterator<String> it = getAllPossibleLocalIps().iterator();
166 String address = "localhost";
167 String[] addresses = null;
168 address = MusicUtil.getMyCassaHost();
169 addresses = address.split(",");
171 logger.info(EELFLoggerDelegate.applicationLogger,
172 "Connecting to cassa cluster: Iterating through possible ips:"
173 + getAllPossibleLocalIps());
174 PoolingOptions poolingOptions = new PoolingOptions();
176 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
177 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
178 while (it.hasNext()) {
180 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
181 logger.info(EELFLoggerDelegate.applicationLogger,
182 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
183 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
184 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
185 //.withLoadBalancingPolicy(new RoundRobinPolicy())
186 .withoutJMXReporting()
187 .withPoolingOptions(poolingOptions)
188 .addContactPoints(addresses).build();
191 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
192 //.withLoadBalancingPolicy(new RoundRobinPolicy())
193 .addContactPoints(addresses).build();
195 Metadata metadata = cluster.getMetadata();
196 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
197 + metadata.getClusterName() + " at " + address);
198 session = cluster.connect();
201 } catch (NoHostAvailableException e) {
203 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE,
204 ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e);
212 public void close() {
217 * This method connects to cassandra cluster on specific address.
221 private void connectToCassaCluster(String address) throws MusicServiceException {
222 String[] addresses = null;
223 addresses = address.split(",");
224 PoolingOptions poolingOptions = new PoolingOptions();
226 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
227 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
228 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
229 logger.info(EELFLoggerDelegate.applicationLogger,
230 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
231 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
232 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
233 //.withLoadBalancingPolicy(new RoundRobinPolicy())
234 .withoutJMXReporting()
235 .withPoolingOptions(poolingOptions)
236 .addContactPoints(addresses).build();
238 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
239 //.withLoadBalancingPolicy(new RoundRobinPolicy())
240 .withoutJMXReporting()
241 .withPoolingOptions(poolingOptions)
242 .addContactPoints(addresses).build();
245 // JmxReporter reporter =
246 // JmxReporter.forRegistry(cluster.getMetrics().getRegistry())
247 // .inDomain(cluster.getClusterName() + "-metrics")
252 Metadata metadata = cluster.getMetadata();
253 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
254 + metadata.getClusterName() + " at " + address);
256 session = cluster.connect();
257 } catch (Exception ex) {
258 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY,
259 ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE, ex);
260 throw new MusicServiceException(
261 "Error while connecting to Cassandra cluster.. " + ex.getMessage());
272 public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
273 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
274 TableMetadata table = ks.getTable(tableName);
275 return table.getColumn(columnName).getType();
283 * @return TableMetadata
285 public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
286 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
287 return ks.getTable(tableName);
294 * @return TableMetadata
296 public KeyspaceMetadata returnKeyspaceMetadata(String keyspace) {
297 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
303 * Utility function to return the Java specific object type.
310 public Object getColValue(Row row, String colName, DataType colType) {
312 switch (colType.getName()) {
314 return row.getString(colName);
316 return row.getUUID(colName);
318 return row.getVarint(colName);
320 return row.getLong(colName);
322 return row.getInt(colName);
324 return row.getFloat(colName);
326 return row.getDouble(colName);
328 return row.getBool(colName);
330 return row.getMap(colName, String.class, String.class);
332 return row.getList(colName, String.class);
338 public byte[] getBlobValue(Row row, String colName, DataType colType) {
339 ByteBuffer bb = row.getBytes(colName);
343 public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
344 ColumnDefinitions colInfo = row.getColumnDefinitions();
346 for (Map.Entry<String, Object> entry : condition.entrySet()) {
347 String colName = entry.getKey();
348 DataType colType = colInfo.getType(colName);
349 Object columnValue = getColValue(row, colName, colType);
350 Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
351 if (columnValue.equals(conditionValue) == false)
358 * Utility function to store ResultSet values in to a MAP for output.
363 public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
364 Map<String, HashMap<String, Object>> resultMap =
367 for (Row row : results) {
368 ColumnDefinitions colInfo = row.getColumnDefinitions();
369 HashMap<String, Object> resultOutput = new HashMap<>();
370 for (Definition definition : colInfo) {
371 if (!(("vector_ts").equals(definition.getName()))) {
372 if(definition.getType().toString().toLowerCase().contains("blob")) {
373 resultOutput.put(definition.getName(),
374 getBlobValue(row, definition.getName(), definition.getType()));
376 resultOutput.put(definition.getName(),
377 getColValue(row, definition.getName(), definition.getType()));
381 resultMap.put("row " + counter, resultOutput);
388 // Prepared Statements 1802 additions
390 public boolean executePut(PreparedQueryObject queryObject, String consistency)
391 throws MusicServiceException, MusicQueryException {
392 return executePut(queryObject, consistency, 0);
395 * This Method performs DDL and DML operations on Cassandra using specified consistency level
397 * @param queryObject Object containing cassandra prepared query and values.
398 * @param consistency Specify consistency level for data synchronization across cassandra
400 * @return Boolean Indicates operation success or failure
401 * @throws MusicServiceException
402 * @throws MusicQueryException
404 public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot)
405 throws MusicServiceException, MusicQueryException {
407 boolean result = false;
408 long timeOfWrite = System.currentTimeMillis();
409 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
410 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
411 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
412 + queryObject.getQuery() + "]");
414 logger.info(EELFLoggerDelegate.applicationLogger,
415 "In preprared Execute Put: the actual insert query:"
416 + queryObject.getQuery() + "; the values"
417 + queryObject.getValues());
418 SimpleStatement preparedInsert = null;
421 preparedInsert = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
422 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
423 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
424 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
425 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
426 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
427 if(queryObject.getConsistency() == null)
428 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
430 preparedInsert.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
431 } else if (consistency.equalsIgnoreCase(MusicUtil.ONE)) {
432 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
433 } else if (consistency.equalsIgnoreCase(MusicUtil.QUORUM)) {
434 preparedInsert.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
435 } else if (consistency.equalsIgnoreCase(MusicUtil.ALL)) {
436 preparedInsert.setConsistencyLevel(ConsistencyLevel.ALL);
438 long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite);
439 preparedInsert.setDefaultTimestamp(timestamp);
441 ResultSet rs = session.execute(preparedInsert);
442 result = rs.wasApplied();
445 catch (AlreadyExistsException ae) {
446 logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" +
447 queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ae);
448 throw new MusicServiceException(ae.getMessage());
450 catch (Exception e) {
451 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED + " ["
452 + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e);
453 throw new MusicServiceException("Executing Session Failure for Request = " + "["
454 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
462 * This method performs DDL operations on Cassandra using consistency level ONE.
464 * @param queryObject Object containing cassandra prepared query and values.
466 * @throws MusicServiceException
467 * @throws MusicQueryException
469 public ResultSet executeEventualGet(PreparedQueryObject queryObject)
470 throws MusicServiceException, MusicQueryException {
471 CacheAccess<String, PreparedStatement> queryBank = CachingUtil.getStatementBank();
472 PreparedStatement preparedEventualGet = null;
473 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
474 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
475 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
476 + queryObject.getQuery() + "]");
478 logger.info(EELFLoggerDelegate.applicationLogger,
479 "Executing Eventual get query:" + queryObject.getQuery());
481 ResultSet results = null;
483 if(queryBank.get(queryObject.getQuery()) != null )
484 preparedEventualGet=queryBank.get(queryObject.getQuery());
486 preparedEventualGet = session.prepare(queryObject.getQuery());
487 CachingUtil.updateStatementBank(queryObject.getQuery(), preparedEventualGet);
489 if(queryObject.getConsistency() == null) {
490 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
492 preparedEventualGet.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
494 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
496 } catch (Exception ex) {
497 logger.error("Exception", ex);
498 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
499 throw new MusicServiceException(ex.getMessage());
506 * This method performs DDL operation on Cassandra using consistency level QUORUM.
508 * @param queryObject Object containing cassandra prepared query and values.
510 * @throws MusicServiceException
511 * @throws MusicQueryException
513 public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
514 throws MusicServiceException, MusicQueryException {
515 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
516 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
517 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
518 + queryObject.getQuery() + "]");
520 logger.info(EELFLoggerDelegate.applicationLogger,
521 "Executing Critical get query:" + queryObject.getQuery());
522 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
523 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
524 ResultSet results = null;
526 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
527 } catch (Exception ex) {
528 logger.error("Exception", ex);
529 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
530 throw new MusicServiceException(ex.getMessage());
536 public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException {
537 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
538 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
539 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
540 + queryObject.getQuery() + "]");
542 ResultSet results = null;
544 SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
546 if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) {
547 if(queryObject.getConsistency() == null) {
548 statement.setConsistencyLevel(ConsistencyLevel.ONE);
550 statement.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
553 else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
554 statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
557 results = session.execute(statement);
559 } catch (Exception ex) {
560 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject
561 .getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ex);
562 throw new MusicServiceException(ex.getMessage());
570 * This method performs DDL operations on Cassandra using consistency level ONE.
572 * @param queryObject Object containing cassandra prepared query and values.
574 public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
575 throws MusicServiceException, MusicQueryException {
576 return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
581 * This method performs DDL operation on Cassandra using consistency level QUORUM.
583 * @param queryObject Object containing cassandra prepared query and values.
585 public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
586 throws MusicServiceException, MusicQueryException {
587 return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);