2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Modifications Copyright (c) 2018-2019 IBM
8 * ===================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * ============LICENSE_END=============================================
22 * ====================================================================
25 package org.onap.music.datastore;
27 import java.net.InetAddress;
28 import java.net.NetworkInterface;
29 import java.net.SocketException;
30 import java.nio.ByteBuffer;
31 import java.util.ArrayList;
32 import java.util.Enumeration;
33 import java.util.HashMap;
34 import java.util.Iterator;
37 import org.apache.commons.jcs.access.CacheAccess;
38 import org.onap.music.eelf.logging.EELFLoggerDelegate;
39 import org.onap.music.eelf.logging.format.AppMessages;
40 import org.onap.music.eelf.logging.format.ErrorSeverity;
41 import org.onap.music.eelf.logging.format.ErrorTypes;
42 import org.onap.music.exceptions.MusicQueryException;
43 import org.onap.music.exceptions.MusicServiceException;
44 import org.onap.music.main.CachingUtil;
45 import org.onap.music.main.MusicUtil;
47 import com.codahale.metrics.JmxReporter;
48 import com.datastax.driver.core.Cluster;
49 import com.datastax.driver.core.ColumnDefinitions;
50 import com.datastax.driver.core.ColumnDefinitions.Definition;
51 import com.datastax.driver.core.ConsistencyLevel;
52 import com.datastax.driver.core.DataType;
53 import com.datastax.driver.core.HostDistance;
54 import com.datastax.driver.core.KeyspaceMetadata;
55 import com.datastax.driver.core.Metadata;
56 import com.datastax.driver.core.PoolingOptions;
57 import com.datastax.driver.core.PreparedStatement;
58 import com.datastax.driver.core.ResultSet;
59 import com.datastax.driver.core.Row;
60 import com.datastax.driver.core.Session;
61 import com.datastax.driver.core.SimpleStatement;
62 import com.datastax.driver.core.TableMetadata;
63 import com.datastax.driver.core.exceptions.AlreadyExistsException;
64 import com.datastax.driver.core.exceptions.InvalidQueryException;
65 import com.datastax.driver.core.exceptions.NoHostAvailableException;
66 import com.sun.jersey.core.util.Base64;
72 public class MusicDataStore {
74 public static final String CONSISTENCY_LEVEL_ONE = "ONE";
75 public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
76 private Session session;
77 private Cluster cluster;
83 public void setSession(Session session) {
84 this.session = session;
90 public Session getSession() {
97 public void setCluster(Cluster cluster) {
98 this.cluster = cluster;
102 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
107 public MusicDataStore() {
108 connectToCassaCluster();
116 public MusicDataStore(Cluster cluster, Session session) {
117 this.session = session;
118 this.cluster = cluster;
124 * @throws MusicServiceException
126 public MusicDataStore(String remoteIp) {
128 connectToCassaCluster(remoteIp);
129 } catch (MusicServiceException e) {
130 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
138 private ArrayList<String> getAllPossibleLocalIps() {
139 ArrayList<String> allPossibleIps = new ArrayList<>();
141 Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
142 while (en.hasMoreElements()) {
143 NetworkInterface ni = en.nextElement();
144 Enumeration<InetAddress> ee = ni.getInetAddresses();
145 while (ee.hasMoreElements()) {
146 InetAddress ia = ee.nextElement();
147 allPossibleIps.add(ia.getHostAddress());
150 } catch (SocketException e) {
151 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
152 }catch(Exception e) {
153 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
155 return allPossibleIps;
159 * This method iterates through all available IP addresses and connects to multiple cassandra
162 private void connectToCassaCluster() {
163 Iterator<String> it = getAllPossibleLocalIps().iterator();
164 String address = "localhost";
165 String[] addresses = null;
166 address = MusicUtil.getMyCassaHost();
167 addresses = address.split(",");
169 logger.info(EELFLoggerDelegate.applicationLogger,
170 "Connecting to cassa cluster: Iterating through possible ips:"
171 + getAllPossibleLocalIps());
172 PoolingOptions poolingOptions = new PoolingOptions();
174 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
175 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
176 while (it.hasNext()) {
178 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
179 logger.info(EELFLoggerDelegate.applicationLogger,
180 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
181 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
182 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
183 //.withLoadBalancingPolicy(new RoundRobinPolicy())
184 .withoutJMXReporting()
185 .withPoolingOptions(poolingOptions)
186 .addContactPoints(addresses).build();
189 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
190 //.withLoadBalancingPolicy(new RoundRobinPolicy())
191 .addContactPoints(addresses).build();
193 Metadata metadata = cluster.getMetadata();
194 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
195 + metadata.getClusterName() + " at " + address);
196 session = cluster.connect();
199 } catch (NoHostAvailableException e) {
201 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
209 public void close() {
214 * This method connects to cassandra cluster on specific address.
218 private void connectToCassaCluster(String address) throws MusicServiceException {
219 String[] addresses = null;
220 addresses = address.split(",");
221 PoolingOptions poolingOptions = new PoolingOptions();
223 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
224 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
225 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
226 logger.info(EELFLoggerDelegate.applicationLogger,
227 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
228 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
229 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
230 //.withLoadBalancingPolicy(new RoundRobinPolicy())
231 .withoutJMXReporting()
232 .withPoolingOptions(poolingOptions)
233 .addContactPoints(addresses).build();
236 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
237 //.withLoadBalancingPolicy(new RoundRobinPolicy())
238 .withoutJMXReporting()
239 .withPoolingOptions(poolingOptions)
240 .addContactPoints(addresses).build();
243 // JmxReporter reporter =
244 // JmxReporter.forRegistry(cluster.getMetrics().getRegistry())
245 // .inDomain(cluster.getClusterName() + "-metrics")
250 Metadata metadata = cluster.getMetadata();
251 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
252 + metadata.getClusterName() + " at " + address);
254 session = cluster.connect();
255 } catch (Exception ex) {
256 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY, ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE);
257 throw new MusicServiceException(
258 "Error while connecting to Cassandra cluster.. " + ex.getMessage());
269 public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
270 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
271 TableMetadata table = ks.getTable(tableName);
272 return table.getColumn(columnName).getType();
280 * @return TableMetadata
282 public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
283 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
284 return ks.getTable(tableName);
289 * Utility function to return the Java specific object type.
296 public Object getColValue(Row row, String colName, DataType colType) {
298 switch (colType.getName()) {
300 return row.getString(colName);
302 return row.getUUID(colName);
304 return row.getVarint(colName);
306 return row.getLong(colName);
308 return row.getInt(colName);
310 return row.getFloat(colName);
312 return row.getDouble(colName);
314 return row.getBool(colName);
316 return row.getMap(colName, String.class, String.class);
318 return row.getList(colName, String.class);
324 public byte[] getBlobValue(Row row, String colName, DataType colType) {
325 ByteBuffer bb = row.getBytes(colName);
329 public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
330 ColumnDefinitions colInfo = row.getColumnDefinitions();
332 for (Map.Entry<String, Object> entry : condition.entrySet()) {
333 String colName = entry.getKey();
334 DataType colType = colInfo.getType(colName);
335 Object columnValue = getColValue(row, colName, colType);
336 Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
337 if (columnValue.equals(conditionValue) == false)
344 * Utility function to store ResultSet values in to a MAP for output.
349 public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
350 Map<String, HashMap<String, Object>> resultMap =
353 for (Row row : results) {
354 ColumnDefinitions colInfo = row.getColumnDefinitions();
355 HashMap<String, Object> resultOutput = new HashMap<>();
356 for (Definition definition : colInfo) {
357 if (!(("vector_ts").equals(definition.getName()))) {
358 if(definition.getType().toString().toLowerCase().contains("blob")) {
359 resultOutput.put(definition.getName(),
360 getBlobValue(row, definition.getName(), definition.getType()));
363 resultOutput.put(definition.getName(),
364 getColValue(row, definition.getName(), definition.getType()));
367 resultMap.put("row " + counter, resultOutput);
374 // Prepared Statements 1802 additions
376 public boolean executePut(PreparedQueryObject queryObject, String consistency)
377 throws MusicServiceException, MusicQueryException {
378 return executePut(queryObject, consistency, 0);
381 * This Method performs DDL and DML operations on Cassandra using specified consistency level
383 * @param queryObject Object containing cassandra prepared query and values.
384 * @param consistency Specify consistency level for data synchronization across cassandra
386 * @return Boolean Indicates operation success or failure
387 * @throws MusicServiceException
388 * @throws MusicQueryException
390 public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot)
391 throws MusicServiceException, MusicQueryException {
393 boolean result = false;
394 long timeOfWrite = System.currentTimeMillis();
395 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
396 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
397 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
398 + queryObject.getQuery() + "]");
400 logger.info(EELFLoggerDelegate.applicationLogger,
401 "In preprared Execute Put: the actual insert query:"
402 + queryObject.getQuery() + "; the values"
403 + queryObject.getValues());
405 PreparedStatement preparedInsert = null;
408 preparedInsert = session.prepare(queryObject.getQuery());
410 } catch(InvalidQueryException iqe) {
411 logger.error("Exception", iqe);
412 logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
413 throw new MusicQueryException(iqe.getMessage());
414 }catch(Exception e) {
415 logger.error("Exception", e);
416 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
417 throw new MusicQueryException(e.getMessage());
421 SimpleStatement preparedInsert = null;
424 preparedInsert = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
425 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
426 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
427 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
428 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
429 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
430 if(queryObject.getConsistency() == null)
431 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
433 preparedInsert.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
434 } else if (consistency.equalsIgnoreCase(MusicUtil.ONE)) {
435 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
436 } else if (consistency.equalsIgnoreCase(MusicUtil.QUORUM)) {
437 preparedInsert.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
438 } else if (consistency.equalsIgnoreCase(MusicUtil.ALL)) {
439 preparedInsert.setConsistencyLevel(ConsistencyLevel.ALL);
441 long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite);
442 preparedInsert.setDefaultTimestamp(timestamp);
444 ResultSet rs = session.execute(preparedInsert);
445 result = rs.wasApplied();
448 catch (AlreadyExistsException ae) {
449 logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
450 throw new MusicServiceException(ae.getMessage());
452 catch (Exception e) {
453 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
454 throw new MusicQueryException("Executing Session Failure for Request = " + "["
455 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
463 * This method performs DDL operations on Cassandra using consistency level ONE.
465 * @param queryObject Object containing cassandra prepared query and values.
467 * @throws MusicServiceException
468 * @throws MusicQueryException
470 public ResultSet executeEventualGet(PreparedQueryObject queryObject)
471 throws MusicServiceException, MusicQueryException {
472 CacheAccess<String, PreparedStatement> queryBank = CachingUtil.getStatementBank();
473 PreparedStatement preparedEventualGet = null;
474 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
475 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
476 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
477 + queryObject.getQuery() + "]");
479 logger.info(EELFLoggerDelegate.applicationLogger,
480 "Executing Eventual get query:" + queryObject.getQuery());
482 ResultSet results = null;
484 if(queryBank.get(queryObject.getQuery()) != null )
485 preparedEventualGet=queryBank.get(queryObject.getQuery());
487 preparedEventualGet = session.prepare(queryObject.getQuery());
488 CachingUtil.updateStatementBank(queryObject.getQuery(), preparedEventualGet);
490 if(queryObject.getConsistency() == null) {
491 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
494 preparedEventualGet.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
496 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
498 } catch (Exception ex) {
499 logger.error("Exception", ex);
500 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
501 throw new MusicServiceException(ex.getMessage());
508 * This method performs DDL operation on Cassandra using consistency level QUORUM.
510 * @param queryObject Object containing cassandra prepared query and values.
512 * @throws MusicServiceException
513 * @throws MusicQueryException
515 public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
516 throws MusicServiceException, MusicQueryException {
517 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
518 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
519 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
520 + queryObject.getQuery() + "]");
522 logger.info(EELFLoggerDelegate.applicationLogger,
523 "Executing Critical get query:" + queryObject.getQuery());
524 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
525 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
526 ResultSet results = null;
528 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
529 } catch (Exception ex) {
530 logger.error("Exception", ex);
531 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
532 throw new MusicServiceException(ex.getMessage());
538 public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException {
539 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
540 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
541 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
542 + queryObject.getQuery() + "]");
544 ResultSet results = null;
546 SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
548 if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) {
549 if(queryObject.getConsistency() == null) {
550 statement.setConsistencyLevel(ConsistencyLevel.ONE);
553 statement.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
556 else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
557 statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
560 results = session.execute(statement);
562 } catch (Exception ex) {
563 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
564 throw new MusicServiceException(ex.getMessage());
572 * This method performs DDL operations on Cassandra using consistency level ONE.
574 * @param queryObject Object containing cassandra prepared query and values.
576 public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
577 throws MusicServiceException, MusicQueryException {
578 return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
583 * This method performs DDL operation on Cassandra using consistency level QUORUM.
585 * @param queryObject Object containing cassandra prepared query and values.
587 public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
588 throws MusicServiceException, MusicQueryException {
589 return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);