2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Modifications Copyright (c) 2018 IBM
8 * ===================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * ============LICENSE_END=============================================
22 * ====================================================================
24 package org.onap.music.datastore;
26 import java.net.InetAddress;
27 import java.net.NetworkInterface;
28 import java.net.SocketException;
29 import java.nio.ByteBuffer;
30 import java.util.ArrayList;
31 import java.util.Enumeration;
32 import java.util.HashMap;
33 import java.util.Iterator;
36 import org.onap.music.eelf.logging.EELFLoggerDelegate;
37 import org.onap.music.eelf.logging.format.AppMessages;
38 import org.onap.music.eelf.logging.format.ErrorSeverity;
39 import org.onap.music.eelf.logging.format.ErrorTypes;
40 import org.onap.music.exceptions.MusicQueryException;
41 import org.onap.music.exceptions.MusicServiceException;
42 import org.onap.music.main.MusicUtil;
44 import com.datastax.driver.core.Cluster;
45 import com.datastax.driver.core.ColumnDefinitions;
46 import com.datastax.driver.core.ColumnDefinitions.Definition;
47 import com.datastax.driver.core.ConsistencyLevel;
48 import com.datastax.driver.core.DataType;
49 import com.datastax.driver.core.HostDistance;
50 import com.datastax.driver.core.KeyspaceMetadata;
51 import com.datastax.driver.core.Metadata;
52 import com.datastax.driver.core.PoolingOptions;
53 import com.datastax.driver.core.PreparedStatement;
54 import com.datastax.driver.core.ResultSet;
55 import com.datastax.driver.core.Row;
56 import com.datastax.driver.core.Session;
57 import com.datastax.driver.core.TableMetadata;
58 import com.datastax.driver.core.exceptions.AlreadyExistsException;
59 import com.datastax.driver.core.exceptions.InvalidQueryException;
60 import com.datastax.driver.core.exceptions.NoHostAvailableException;
66 public class MusicDataStore {
68 private Session session;
69 private Cluster cluster;
73 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
78 public MusicDataStore() {
79 connectToCassaCluster();
87 public MusicDataStore(Cluster cluster, Session session) {
88 this.session = session;
89 this.cluster = cluster;
95 * @throws MusicServiceException
97 public MusicDataStore(String remoteIp) {
99 connectToCassaCluster(remoteIp);
100 } catch (MusicServiceException e) {
101 logger.error("Exception", e);
102 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
111 public void setSession(Session session) {
112 this.session = session;
118 public Session getSession() {
125 public void setCluster(Cluster cluster) {
126 this.cluster = cluster;
133 private ArrayList<String> getAllPossibleLocalIps() {
134 ArrayList<String> allPossibleIps = new ArrayList<>();
136 Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
137 while (en.hasMoreElements()) {
138 NetworkInterface ni = en.nextElement();
139 Enumeration<InetAddress> ee = ni.getInetAddresses();
140 while (ee.hasMoreElements()) {
141 InetAddress ia = ee.nextElement();
142 allPossibleIps.add(ia.getHostAddress());
145 } catch (SocketException e) {
146 logger.error("Exception", e);
147 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
148 }catch(Exception e) {
149 logger.error("Exception", e);
150 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
152 return allPossibleIps;
156 * This method iterates through all available IP addresses and connects to multiple cassandra
159 private void connectToCassaCluster() {
160 Iterator<String> it = getAllPossibleLocalIps().iterator();
161 String address = "localhost";
162 String[] addresses = null;
163 address = MusicUtil.getMyCassaHost();
164 addresses = address.split(",");
166 logger.info(EELFLoggerDelegate.applicationLogger,
167 "Connecting to cassa cluster: Iterating through possible ips:"
168 + getAllPossibleLocalIps());
169 PoolingOptions poolingOptions = new PoolingOptions();
171 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
172 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
173 while (it.hasNext()) {
175 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
176 logger.info(EELFLoggerDelegate.applicationLogger,
177 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
178 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
179 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
180 //.withLoadBalancingPolicy(new RoundRobinPolicy())
181 .withPoolingOptions(poolingOptions)
182 .addContactPoints(addresses).build();
185 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
186 //.withLoadBalancingPolicy(new RoundRobinPolicy())
187 .addContactPoints(addresses).build();
189 Metadata metadata = cluster.getMetadata();
190 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
191 + metadata.getClusterName() + " at " + address);
192 session = cluster.connect();
195 } catch (NoHostAvailableException e) {
196 logger.error("Exception", e);
198 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
206 public void close() {
211 * This method connects to cassandra cluster on specific address.
215 private void connectToCassaCluster(String address) throws MusicServiceException {
216 String[] addresses = null;
217 addresses = address.split(",");
218 PoolingOptions poolingOptions = new PoolingOptions();
220 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
221 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
222 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
223 logger.info(EELFLoggerDelegate.applicationLogger,
224 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
225 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
226 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
227 //.withLoadBalancingPolicy(new RoundRobinPolicy())
228 .withPoolingOptions(poolingOptions)
229 .addContactPoints(addresses).build();
232 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
233 //.withLoadBalancingPolicy(new RoundRobinPolicy())
234 .withPoolingOptions(poolingOptions)
235 .addContactPoints(addresses).build();
237 Metadata metadata = cluster.getMetadata();
238 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
239 + metadata.getClusterName() + " at " + address);
241 session = cluster.connect();
242 } catch (Exception ex) {
243 logger.error("Exception", ex);
244 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY, ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE);
245 throw new MusicServiceException(
246 "Error while connecting to Cassandra cluster.. " + ex.getMessage());
257 public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
258 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
259 TableMetadata table = ks.getTable(tableName);
260 return table.getColumn(columnName).getType();
268 * @return TableMetadata
270 public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
271 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
272 return ks.getTable(tableName);
277 * Utility function to return the Java specific object type.
284 public Object getColValue(Row row, String colName, DataType colType) {
286 switch (colType.getName()) {
288 return row.getString(colName);
290 return row.getUUID(colName);
292 return row.getVarint(colName);
294 return row.getLong(colName);
296 return row.getInt(colName);
298 return row.getFloat(colName);
300 return row.getDouble(colName);
302 return row.getBool(colName);
304 return row.getMap(colName, String.class, String.class);
306 return row.getList(colName, String.class);
312 public byte[] getBlobValue(Row row, String colName, DataType colType) {
313 ByteBuffer bb = row.getBytes(colName);
317 public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
318 ColumnDefinitions colInfo = row.getColumnDefinitions();
320 for (Map.Entry<String, Object> entry : condition.entrySet()) {
321 String colName = entry.getKey();
322 DataType colType = colInfo.getType(colName);
323 Object columnValue = getColValue(row, colName, colType);
324 Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
325 if (!columnValue.equals(conditionValue))
332 * Utility function to store ResultSet values in to a MAP for output.
337 public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
338 Map<String, HashMap<String, Object>> resultMap =
341 for (Row row : results) {
342 ColumnDefinitions colInfo = row.getColumnDefinitions();
343 HashMap<String, Object> resultOutput = new HashMap<>();
344 for (Definition definition : colInfo) {
345 if (!(("vector_ts").equals(definition.getName()))) {
346 if(definition.getType().toString().toLowerCase().contains("blob")) {
347 resultOutput.put(definition.getName(),
348 getBlobValue(row, definition.getName(), definition.getType()));
351 resultOutput.put(definition.getName(),
352 getColValue(row, definition.getName(), definition.getType()));
355 resultMap.put("row " + counter, resultOutput);
362 // Prepared Statements 1802 additions
364 * This Method performs DDL and DML operations on Cassandra using specified consistency level
366 * @param queryObject Object containing cassandra prepared query and values.
367 * @param consistency Specify consistency level for data synchronization across cassandra
369 * @return Boolean Indicates operation success or failure
370 * @throws MusicServiceException
371 * @throws MusicQueryException
373 public boolean executePut(PreparedQueryObject queryObject, String consistency)
374 throws MusicServiceException, MusicQueryException {
376 boolean result = false;
378 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
379 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
380 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
381 + queryObject.getQuery() + "]");
383 logger.info(EELFLoggerDelegate.applicationLogger,
384 "In preprared Execute Put: the actual insert query:"
385 + queryObject.getQuery() + "; the values"
386 + queryObject.getValues());
387 PreparedStatement preparedInsert = null;
390 preparedInsert = session.prepare(queryObject.getQuery());
392 } catch(InvalidQueryException iqe) {
393 logger.error("Exception", iqe);
394 logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
395 throw new MusicQueryException(iqe.getMessage());
396 }catch(Exception e) {
397 logger.error("Exception", e);
398 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
399 throw new MusicQueryException(e.getMessage());
403 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
404 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
405 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
406 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
407 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
408 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
411 ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
412 result = rs.wasApplied();
415 catch (AlreadyExistsException ae) {
416 logger.error("Exception", ae);
417 logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
418 throw new MusicServiceException(ae.getMessage());
420 catch (Exception e) {
421 logger.error("Exception", e);
422 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
423 throw new MusicQueryException("Executing Session Failure for Request = " + "["
424 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
432 * This method performs DDL operations on Cassandra using consistency level ONE.
434 * @param queryObject Object containing cassandra prepared query and values.
436 * @throws MusicServiceException
437 * @throws MusicQueryException
439 public ResultSet executeEventualGet(PreparedQueryObject queryObject)
440 throws MusicServiceException, MusicQueryException {
442 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
443 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
444 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
445 + queryObject.getQuery() + "]");
447 logger.info(EELFLoggerDelegate.applicationLogger,
448 "Executing Eventual get query:" + queryObject.getQuery());
450 ResultSet results = null;
452 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
453 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
454 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
456 } catch (Exception ex) {
457 logger.error("Exception", ex);
458 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
459 throw new MusicServiceException(ex.getMessage());
466 * This method performs DDL operation on Cassandra using consistency level QUORUM.
468 * @param queryObject Object containing cassandra prepared query and values.
470 * @throws MusicServiceException
471 * @throws MusicQueryException
473 public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
474 throws MusicServiceException, MusicQueryException {
475 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
476 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
477 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
478 + queryObject.getQuery() + "]");
480 logger.info(EELFLoggerDelegate.applicationLogger,
481 "Executing Critical get query:" + queryObject.getQuery());
482 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
483 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
484 ResultSet results = null;
486 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
487 } catch (Exception ex) {
488 logger.error("Exception", ex);
489 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
490 throw new MusicServiceException(ex.getMessage());