X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fmusic%2Fdatastore%2FMusicDataStore.java;h=2b88b4a8a586e1fe75d4f953cf3fd31a77121d3f;hb=d6e7b63cc580e7b3822be61fe92a493ad5e222a3;hp=6ced1e48025298bf4ab7a7a69edae01fec66413d;hpb=3acbae6bf3be04f352bfaac340b8303064a2f7c4;p=music.git diff --git a/src/main/java/org/onap/music/datastore/MusicDataStore.java b/src/main/java/org/onap/music/datastore/MusicDataStore.java old mode 100644 new mode 100755 index 6ced1e48..2b88b4a8 --- a/src/main/java/org/onap/music/datastore/MusicDataStore.java +++ b/src/main/java/org/onap/music/datastore/MusicDataStore.java @@ -4,48 +4,66 @@ * =================================================================== * Copyright (c) 2017 AT&T Intellectual Property * =================================================================== + * Modifications Copyright (c) 2018-2019 IBM + * Modifications Copyright (c) 2019 Samsung + * =================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * + * * ============LICENSE_END============================================= * ==================================================================== */ + package org.onap.music.datastore; import java.net.InetAddress; import java.net.NetworkInterface; import java.net.SocketException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; import java.util.Map; + +import org.apache.commons.jcs.access.CacheAccess; +import org.onap.music.authentication.CachingUtil; import org.onap.music.eelf.logging.EELFLoggerDelegate; +import org.onap.music.eelf.logging.format.AppMessages; +import org.onap.music.eelf.logging.format.ErrorSeverity; +import org.onap.music.eelf.logging.format.ErrorTypes; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; import org.onap.music.main.MusicUtil; +import com.codahale.metrics.JmxReporter; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ColumnDefinitions; import com.datastax.driver.core.ColumnDefinitions.Definition; import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.DataType; +import com.datastax.driver.core.HostDistance; import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PoolingOptions; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.exceptions.AlreadyExistsException; +import com.datastax.driver.core.exceptions.InvalidQueryException; import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.sun.jersey.core.util.Base64; /** * @author nelson24 @@ -53,11 +71,12 @@ import com.datastax.driver.core.exceptions.NoHostAvailableException; */ public class MusicDataStore { + public static final String CONSISTENCY_LEVEL_ONE = "ONE"; + public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM"; private Session session; private Cluster cluster; - /** * @param session */ @@ -65,6 +84,13 @@ public class MusicDataStore { this.session = session; } + /** + * @param session + */ + public Session getSession() { + return session; + } + /** * @param cluster */ @@ -73,11 +99,10 @@ public class MusicDataStore { } - private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class); /** - * + * */ public MusicDataStore() { connectToCassaCluster(); @@ -94,7 +119,7 @@ public class MusicDataStore { } /** - * + * * @param remoteIp * @throws MusicServiceException */ @@ -102,28 +127,32 @@ public class MusicDataStore { try { connectToCassaCluster(remoteIp); } catch (MusicServiceException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage()); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e); } } /** - * + * * @return */ private ArrayList getAllPossibleLocalIps() { - ArrayList allPossibleIps = new ArrayList(); + ArrayList allPossibleIps = new ArrayList<>(); try { Enumeration en = NetworkInterface.getNetworkInterfaces(); while (en.hasMoreElements()) { - NetworkInterface ni = (NetworkInterface) en.nextElement(); + NetworkInterface ni = en.nextElement(); Enumeration ee = ni.getInetAddresses(); while (ee.hasMoreElements()) { - InetAddress ia = (InetAddress) ee.nextElement(); + InetAddress ia = ee.nextElement(); allPossibleIps.add(ia.getHostAddress()); } } } catch (SocketException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage()); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, + ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e); + }catch(Exception e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes + .GENERALSERVICEERROR, e); } return allPossibleIps; } @@ -135,14 +164,34 @@ public class MusicDataStore { private void connectToCassaCluster() { Iterator it = getAllPossibleLocalIps().iterator(); String address = "localhost"; + String[] addresses = null; + address = MusicUtil.getMyCassaHost(); + addresses = address.split(","); + logger.info(EELFLoggerDelegate.applicationLogger, "Connecting to cassa cluster: Iterating through possible ips:" + getAllPossibleLocalIps()); + PoolingOptions poolingOptions = new PoolingOptions(); + poolingOptions + .setConnectionsPerHost(HostDistance.LOCAL, 4, 10) + .setConnectionsPerHost(HostDistance.REMOTE, 2, 4); while (it.hasNext()) { try { - cluster = Cluster.builder().withPort(9042) - .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd()) - .addContactPoint(address).build(); + if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) { + logger.info(EELFLoggerDelegate.applicationLogger, + "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd()); + cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) + .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd()) + //.withLoadBalancingPolicy(new RoundRobinPolicy()) + .withoutJMXReporting() + .withPoolingOptions(poolingOptions) + .addContactPoints(addresses).build(); + } + else + cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) + //.withLoadBalancingPolicy(new RoundRobinPolicy()) + .addContactPoints(addresses).build(); + Metadata metadata = cluster.getMetadata(); logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster " + metadata.getClusterName() + " at " + address); @@ -151,13 +200,14 @@ public class MusicDataStore { break; } catch (NoHostAvailableException e) { address = it.next(); - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage()); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE, + ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e); } } } /** - * + * */ public void close() { session.close(); @@ -165,27 +215,55 @@ public class MusicDataStore { /** * This method connects to cassandra cluster on specific address. - * + * * @param address */ private void connectToCassaCluster(String address) throws MusicServiceException { - cluster = Cluster.builder().withPort(9042) + String[] addresses = null; + addresses = address.split(","); + PoolingOptions poolingOptions = new PoolingOptions(); + poolingOptions + .setConnectionsPerHost(HostDistance.LOCAL, 4, 10) + .setConnectionsPerHost(HostDistance.REMOTE, 2, 4); + if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) { + logger.info(EELFLoggerDelegate.applicationLogger, + "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd()); + cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd()) - .addContactPoint(address).build(); + //.withLoadBalancingPolicy(new RoundRobinPolicy()) + .withoutJMXReporting() + .withPoolingOptions(poolingOptions) + .addContactPoints(addresses).build(); + } else { + cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) + //.withLoadBalancingPolicy(new RoundRobinPolicy()) + .withoutJMXReporting() + .withPoolingOptions(poolingOptions) + .addContactPoints(addresses).build(); + } + + // JmxReporter reporter = + // JmxReporter.forRegistry(cluster.getMetrics().getRegistry()) + // .inDomain(cluster.getClusterName() + "-metrics") + // .build(); + + // reporter.start(); + Metadata metadata = cluster.getMetadata(); logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster " + metadata.getClusterName() + " at " + address); try { session = cluster.connect(); } catch (Exception ex) { - logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage()); + logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY, + ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE, ex); throw new MusicServiceException( "Error while connecting to Cassandra cluster.. " + ex.getMessage()); } } /** - * + * * @param keyspace * @param tableName * @param columnName @@ -199,7 +277,7 @@ public class MusicDataStore { } /** - * + * * @param keyspace * @param tableName * @return TableMetadata @@ -208,11 +286,22 @@ public class MusicDataStore { KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace); return ks.getTable(tableName); } + + /** + * + * @param keyspace + * @param tableName + * @return TableMetadata + */ + public KeyspaceMetadata returnKeyspaceMetadata(String keyspace) { + KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace); + return ks; + } /** * Utility function to return the Java specific object type. - * + * * @param row * @param colName * @param colType @@ -239,12 +328,19 @@ public class MusicDataStore { return row.getBool(colName); case MAP: return row.getMap(colName, String.class, String.class); + case LIST: + return row.getList(colName, String.class); default: return null; } } - public boolean doesRowSatisfyCondition(Row row, Map condition) { + public byte[] getBlobValue(Row row, String colName, DataType colType) { + ByteBuffer bb = row.getBytes(colName); + return bb.array(); + } + + public boolean doesRowSatisfyCondition(Row row, Map condition) throws Exception { ColumnDefinitions colInfo = row.getColumnDefinitions(); for (Map.Entry entry : condition.entrySet()) { @@ -260,21 +356,27 @@ public class MusicDataStore { /** * Utility function to store ResultSet values in to a MAP for output. - * + * * @param results * @return MAP */ public Map> marshalData(ResultSet results) { Map> resultMap = - new HashMap>(); + new HashMap<>(); int counter = 0; for (Row row : results) { ColumnDefinitions colInfo = row.getColumnDefinitions(); - HashMap resultOutput = new HashMap(); + HashMap resultOutput = new HashMap<>(); for (Definition definition : colInfo) { - if (!definition.getName().equals("vector_ts")) - resultOutput.put(definition.getName(), + if (!(("vector_ts").equals(definition.getName()))) { + if(definition.getType().toString().toLowerCase().contains("blob")) { + resultOutput.put(definition.getName(), + getBlobValue(row, definition.getName(), definition.getType())); + } else { + resultOutput.put(definition.getName(), getColValue(row, definition.getName(), definition.getType())); + } + } } resultMap.put("row " + counter, resultOutput); counter++; @@ -284,9 +386,14 @@ public class MusicDataStore { // Prepared Statements 1802 additions + + public boolean executePut(PreparedQueryObject queryObject, String consistency) + throws MusicServiceException, MusicQueryException { + return executePut(queryObject, consistency, 0); + } /** * This Method performs DDL and DML operations on Cassandra using specified consistency level - * + * * @param queryObject Object containing cassandra prepared query and values. * @param consistency Specify consistency level for data synchronization across cassandra * replicas @@ -294,14 +401,13 @@ public class MusicDataStore { * @throws MusicServiceException * @throws MusicQueryException */ - public boolean executePut(PreparedQueryObject queryObject, String consistency) + public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot) throws MusicServiceException, MusicQueryException { boolean result = false; - + long timeOfWrite = System.currentTimeMillis(); if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) { - logger.error(EELFLoggerDelegate.errorLogger, - "Error while processing prepared query object"); + logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); throw new MusicQueryException("Ill formed queryObject for the request = " + "[" + queryObject.getQuery() + "]"); } @@ -309,22 +415,41 @@ public class MusicDataStore { "In preprared Execute Put: the actual insert query:" + queryObject.getQuery() + "; the values" + queryObject.getValues()); - PreparedStatement preparedInsert = session.prepare(queryObject.getQuery()); + SimpleStatement preparedInsert = null; + try { + preparedInsert = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray()); if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) { logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query"); preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM); } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) { logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query"); + if(queryObject.getConsistency() == null) + preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE); + else + preparedInsert.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency())); + } else if (consistency.equalsIgnoreCase(MusicUtil.ONE)) { preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE); + } else if (consistency.equalsIgnoreCase(MusicUtil.QUORUM)) { + preparedInsert.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + } else if (consistency.equalsIgnoreCase(MusicUtil.ALL)) { + preparedInsert.setConsistencyLevel(ConsistencyLevel.ALL); } + long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite); + preparedInsert.setDefaultTimestamp(timestamp); - ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray())); + ResultSet rs = session.execute(preparedInsert); result = rs.wasApplied(); - } catch (Exception e) { - logger.error(EELFLoggerDelegate.errorLogger, "Executing Session Failure for Request = " - + "[" + queryObject.getQuery() + "]" + " Reason = " + e.getMessage()); + } + catch (AlreadyExistsException ae) { + logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ae); + throw new MusicServiceException(ae.getMessage()); + } + catch (Exception e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED + " [" + + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e); throw new MusicServiceException("Executing Session Failure for Request = " + "[" + queryObject.getQuery() + "]" + " Reason = " + e.getMessage()); } @@ -333,50 +458,63 @@ public class MusicDataStore { return result; } - /** + /* *//** * This method performs DDL operations on Cassandra using consistency level ONE. - * + * * @param queryObject Object containing cassandra prepared query and values. * @return ResultSet * @throws MusicServiceException * @throws MusicQueryException - */ + *//* public ResultSet executeEventualGet(PreparedQueryObject queryObject) throws MusicServiceException, MusicQueryException { - + CacheAccess queryBank = CachingUtil.getStatementBank(); + PreparedStatement preparedEventualGet = null; if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) { + logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); throw new MusicQueryException("Ill formed queryObject for the request = " + "[" + queryObject.getQuery() + "]"); } logger.info(EELFLoggerDelegate.applicationLogger, "Executing Eventual get query:" + queryObject.getQuery()); - PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery()); - preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE); + ResultSet results = null; try { + if(queryBank.get(queryObject.getQuery()) != null ) + preparedEventualGet=queryBank.get(queryObject.getQuery()); + else { + preparedEventualGet = session.prepare(queryObject.getQuery()); + CachingUtil.updateStatementBank(queryObject.getQuery(), preparedEventualGet); + } + if(queryObject.getConsistency() == null) { + preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE); + } else { + preparedEventualGet.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency())); + } results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray())); } catch (Exception ex) { - logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage()); + logger.error("Exception", ex); + logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); throw new MusicServiceException(ex.getMessage()); } return results; } - /** - * + *//** + * * This method performs DDL operation on Cassandra using consistency level QUORUM. - * + * * @param queryObject Object containing cassandra prepared query and values. * @return ResultSet * @throws MusicServiceException * @throws MusicQueryException - */ + *//* public ResultSet executeCriticalGet(PreparedQueryObject queryObject) throws MusicServiceException, MusicQueryException { if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) { - logger.error(EELFLoggerDelegate.errorLogger, "Error processing Prepared Query Object"); - throw new MusicQueryException("Ill formed queryObject for the request = " + "[" + logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); + throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "[" + queryObject.getQuery() + "]"); } logger.info(EELFLoggerDelegate.applicationLogger, @@ -387,11 +525,66 @@ public class MusicDataStore { try { results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray())); } catch (Exception ex) { - logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage()); + logger.error("Exception", ex); + logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); throw new MusicServiceException(ex.getMessage()); } return results; } + */ + public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException { + if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) { + logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); + throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "[" + + queryObject.getQuery() + "]"); + } + ResultSet results = null; + try { + SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray()); + + if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) { + if(queryObject.getConsistency() == null) { + statement.setConsistencyLevel(ConsistencyLevel.ONE); + } else { + statement.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency())); + } + } + else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) { + statement.setConsistencyLevel(ConsistencyLevel.QUORUM); + } + + results = session.execute(statement); + + } catch (Exception ex) { + logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject + .getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ex); + throw new MusicServiceException(ex.getMessage()); + } + + return results; + + } + + /** + * This method performs DDL operations on Cassandra using consistency level ONE. + * + * @param queryObject Object containing cassandra prepared query and values. + */ + public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject) + throws MusicServiceException, MusicQueryException { + return executeGet(queryObject, CONSISTENCY_LEVEL_ONE); + } + + /** + * + * This method performs DDL operation on Cassandra using consistency level QUORUM. + * + * @param queryObject Object containing cassandra prepared query and values. + */ + public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject) + throws MusicServiceException, MusicQueryException { + return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM); + } }