2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * ============LICENSE_END=============================================
20 * ====================================================================
22 package org.onap.music.datastore;
24 import java.net.InetAddress;
25 import java.net.NetworkInterface;
26 import java.net.SocketException;
27 import java.nio.ByteBuffer;
28 import java.util.ArrayList;
29 import java.util.Enumeration;
30 import java.util.HashMap;
31 import java.util.Iterator;
33 import org.onap.music.eelf.logging.EELFLoggerDelegate;
34 import org.onap.music.eelf.logging.format.AppMessages;
35 import org.onap.music.eelf.logging.format.ErrorSeverity;
36 import org.onap.music.eelf.logging.format.ErrorTypes;
37 import org.onap.music.exceptions.MusicQueryException;
38 import org.onap.music.exceptions.MusicServiceException;
39 import org.onap.music.main.MusicUtil;
40 import com.datastax.driver.core.Cluster;
41 import com.datastax.driver.core.ColumnDefinitions;
42 import com.datastax.driver.core.ColumnDefinitions.Definition;
43 import com.datastax.driver.core.ConsistencyLevel;
44 import com.datastax.driver.core.DataType;
45 import com.datastax.driver.core.HostDistance;
46 import com.datastax.driver.core.KeyspaceMetadata;
47 import com.datastax.driver.core.Metadata;
48 import com.datastax.driver.core.PoolingOptions;
49 import com.datastax.driver.core.PreparedStatement;
50 import com.datastax.driver.core.ResultSet;
51 import com.datastax.driver.core.Row;
52 import com.datastax.driver.core.Session;
53 import com.datastax.driver.core.TableMetadata;
54 import com.datastax.driver.core.exceptions.AlreadyExistsException;
55 import com.datastax.driver.core.exceptions.InvalidQueryException;
56 import com.datastax.driver.core.exceptions.NoHostAvailableException;
57 import com.sun.jersey.core.util.Base64;
63 public class MusicDataStore {
65 private Session session;
66 private Cluster cluster;
73 public void setSession(Session session) {
74 this.session = session;
80 public Session getSession() {
87 public void setCluster(Cluster cluster) {
88 this.cluster = cluster;
93 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
98 public MusicDataStore() {
99 connectToCassaCluster();
107 public MusicDataStore(Cluster cluster, Session session) {
108 this.session = session;
109 this.cluster = cluster;
115 * @throws MusicServiceException
117 public MusicDataStore(String remoteIp) {
119 connectToCassaCluster(remoteIp);
120 } catch (MusicServiceException e) {
121 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
129 private ArrayList<String> getAllPossibleLocalIps() {
130 ArrayList<String> allPossibleIps = new ArrayList<String>();
132 Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
133 while (en.hasMoreElements()) {
134 NetworkInterface ni = (NetworkInterface) en.nextElement();
135 Enumeration<InetAddress> ee = ni.getInetAddresses();
136 while (ee.hasMoreElements()) {
137 InetAddress ia = (InetAddress) ee.nextElement();
138 allPossibleIps.add(ia.getHostAddress());
141 } catch (SocketException e) {
142 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
143 }catch(Exception e) {
144 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
146 return allPossibleIps;
150 * This method iterates through all available IP addresses and connects to multiple cassandra
153 private void connectToCassaCluster() {
154 Iterator<String> it = getAllPossibleLocalIps().iterator();
155 String address = "localhost";
156 String[] addresses = null;
157 address = MusicUtil.getMyCassaHost();
158 addresses = address.split(",");
160 logger.info(EELFLoggerDelegate.applicationLogger,
161 "Connecting to cassa cluster: Iterating through possible ips:"
162 + getAllPossibleLocalIps());
163 PoolingOptions poolingOptions = new PoolingOptions();
165 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
166 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
167 while (it.hasNext()) {
169 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
170 logger.info(EELFLoggerDelegate.applicationLogger,
171 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
172 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
173 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
174 //.withLoadBalancingPolicy(new RoundRobinPolicy())
175 .withPoolingOptions(poolingOptions)
176 .addContactPoints(addresses).build();
179 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
180 //.withLoadBalancingPolicy(new RoundRobinPolicy())
181 .addContactPoints(addresses).build();
183 Metadata metadata = cluster.getMetadata();
184 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
185 + metadata.getClusterName() + " at " + address);
186 session = cluster.connect();
189 } catch (NoHostAvailableException e) {
191 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
199 public void close() {
204 * This method connects to cassandra cluster on specific address.
208 private void connectToCassaCluster(String address) throws MusicServiceException {
209 String[] addresses = null;
210 addresses = address.split(",");
211 PoolingOptions poolingOptions = new PoolingOptions();
213 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
214 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
215 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
216 logger.info(EELFLoggerDelegate.applicationLogger,
217 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
218 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
219 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
220 //.withLoadBalancingPolicy(new RoundRobinPolicy())
221 .withPoolingOptions(poolingOptions)
222 .addContactPoints(addresses).build();
225 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
226 //.withLoadBalancingPolicy(new RoundRobinPolicy())
227 .withPoolingOptions(poolingOptions)
228 .addContactPoints(addresses).build();
230 Metadata metadata = cluster.getMetadata();
231 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
232 + metadata.getClusterName() + " at " + address);
234 session = cluster.connect();
235 } catch (Exception ex) {
236 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY, ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE);
237 throw new MusicServiceException(
238 "Error while connecting to Cassandra cluster.. " + ex.getMessage());
249 public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
250 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
251 TableMetadata table = ks.getTable(tableName);
252 return table.getColumn(columnName).getType();
260 * @return TableMetadata
262 public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
263 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
264 return ks.getTable(tableName);
269 * Utility function to return the Java specific object type.
276 public Object getColValue(Row row, String colName, DataType colType) {
278 switch (colType.getName()) {
280 return row.getString(colName);
282 return row.getUUID(colName);
284 return row.getVarint(colName);
286 return row.getLong(colName);
288 return row.getInt(colName);
290 return row.getFloat(colName);
292 return row.getDouble(colName);
294 return row.getBool(colName);
296 return row.getMap(colName, String.class, String.class);
298 return row.getList(colName, String.class);
304 public byte[] getBlobValue(Row row, String colName, DataType colType) {
305 ByteBuffer bb = row.getBytes(colName);
306 byte[] data = bb.array();
310 public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
311 ColumnDefinitions colInfo = row.getColumnDefinitions();
313 for (Map.Entry<String, Object> entry : condition.entrySet()) {
314 String colName = entry.getKey();
315 DataType colType = colInfo.getType(colName);
316 Object columnValue = getColValue(row, colName, colType);
317 Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
318 if (columnValue.equals(conditionValue) == false)
325 * Utility function to store ResultSet values in to a MAP for output.
330 public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
331 Map<String, HashMap<String, Object>> resultMap =
332 new HashMap<String, HashMap<String, Object>>();
334 for (Row row : results) {
335 ColumnDefinitions colInfo = row.getColumnDefinitions();
336 HashMap<String, Object> resultOutput = new HashMap<String, Object>();
337 for (Definition definition : colInfo) {
338 if (!definition.getName().equals("vector_ts")) {
339 if(definition.getType().toString().toLowerCase().contains("blob")) {
340 resultOutput.put(definition.getName(),
341 getBlobValue(row, definition.getName(), definition.getType()));
344 resultOutput.put(definition.getName(),
345 getColValue(row, definition.getName(), definition.getType()));
348 resultMap.put("row " + counter, resultOutput);
355 // Prepared Statements 1802 additions
357 * This Method performs DDL and DML operations on Cassandra using specified consistency level
359 * @param queryObject Object containing cassandra prepared query and values.
360 * @param consistency Specify consistency level for data synchronization across cassandra
362 * @return Boolean Indicates operation success or failure
363 * @throws MusicServiceException
364 * @throws MusicQueryException
366 public boolean executePut(PreparedQueryObject queryObject, String consistency)
367 throws MusicServiceException, MusicQueryException {
369 boolean result = false;
371 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
372 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
373 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
374 + queryObject.getQuery() + "]");
376 logger.info(EELFLoggerDelegate.applicationLogger,
377 "In preprared Execute Put: the actual insert query:"
378 + queryObject.getQuery() + "; the values"
379 + queryObject.getValues());
380 PreparedStatement preparedInsert = null;
383 preparedInsert = session.prepare(queryObject.getQuery());
385 } catch(InvalidQueryException iqe) {
386 logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
387 throw new MusicQueryException(iqe.getMessage());
388 }catch(Exception e) {
389 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
390 throw new MusicQueryException(e.getMessage());
394 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
395 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
396 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
397 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
398 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
399 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
402 ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
403 result = rs.wasApplied();
406 catch (AlreadyExistsException ae) {
407 logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
408 throw new MusicServiceException(ae.getMessage());
410 catch (Exception e) {
411 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
412 throw new MusicQueryException("Executing Session Failure for Request = " + "["
413 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
421 * This method performs DDL operations on Cassandra using consistency level ONE.
423 * @param queryObject Object containing cassandra prepared query and values.
425 * @throws MusicServiceException
426 * @throws MusicQueryException
428 public ResultSet executeEventualGet(PreparedQueryObject queryObject)
429 throws MusicServiceException, MusicQueryException {
431 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
432 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
433 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
434 + queryObject.getQuery() + "]");
436 logger.info(EELFLoggerDelegate.applicationLogger,
437 "Executing Eventual get query:" + queryObject.getQuery());
439 ResultSet results = null;
441 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
442 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
443 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
445 } catch (Exception ex) {
446 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
447 throw new MusicServiceException(ex.getMessage());
454 * This method performs DDL operation on Cassandra using consistency level QUORUM.
456 * @param queryObject Object containing cassandra prepared query and values.
458 * @throws MusicServiceException
459 * @throws MusicQueryException
461 public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
462 throws MusicServiceException, MusicQueryException {
463 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
464 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
465 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
466 + queryObject.getQuery() + "]");
468 logger.info(EELFLoggerDelegate.applicationLogger,
469 "Executing Critical get query:" + queryObject.getQuery());
470 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
471 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
472 ResultSet results = null;
474 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
475 } catch (Exception ex) {
476 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
477 throw new MusicServiceException(ex.getMessage());