2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Modifications Copyright (c) 2018 IBM
8 * ===================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * ============LICENSE_END=============================================
22 * ====================================================================
24 package org.onap.music.datastore;
26 import java.net.InetAddress;
27 import java.net.NetworkInterface;
28 import java.net.SocketException;
29 import java.nio.ByteBuffer;
30 import java.util.ArrayList;
31 import java.util.Enumeration;
32 import java.util.HashMap;
33 import java.util.Iterator;
36 import org.onap.music.eelf.logging.EELFLoggerDelegate;
37 import org.onap.music.eelf.logging.format.AppMessages;
38 import org.onap.music.eelf.logging.format.ErrorSeverity;
39 import org.onap.music.eelf.logging.format.ErrorTypes;
40 import org.onap.music.exceptions.MusicQueryException;
41 import org.onap.music.exceptions.MusicServiceException;
42 import org.onap.music.main.MusicUtil;
44 import com.datastax.driver.core.Cluster;
45 import com.datastax.driver.core.ColumnDefinitions;
46 import com.datastax.driver.core.ColumnDefinitions.Definition;
47 import com.datastax.driver.core.ConsistencyLevel;
48 import com.datastax.driver.core.DataType;
49 import com.datastax.driver.core.HostDistance;
50 import com.datastax.driver.core.KeyspaceMetadata;
51 import com.datastax.driver.core.Metadata;
52 import com.datastax.driver.core.PoolingOptions;
53 import com.datastax.driver.core.PreparedStatement;
54 import com.datastax.driver.core.ResultSet;
55 import com.datastax.driver.core.Row;
56 import com.datastax.driver.core.Session;
57 import com.datastax.driver.core.TableMetadata;
58 import com.datastax.driver.core.exceptions.AlreadyExistsException;
59 import com.datastax.driver.core.exceptions.InvalidQueryException;
60 import com.datastax.driver.core.exceptions.NoHostAvailableException;
66 public class MusicDataStore {
68 private Session session;
69 private Cluster cluster;
76 public void setSession(Session session) {
77 this.session = session;
83 public Session getSession() {
90 public void setCluster(Cluster cluster) {
91 this.cluster = cluster;
96 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
101 public MusicDataStore() {
102 connectToCassaCluster();
110 public MusicDataStore(Cluster cluster, Session session) {
111 this.session = session;
112 this.cluster = cluster;
118 * @throws MusicServiceException
120 public MusicDataStore(String remoteIp) {
122 connectToCassaCluster(remoteIp);
123 } catch (MusicServiceException e) {
124 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
132 private ArrayList<String> getAllPossibleLocalIps() {
133 ArrayList<String> allPossibleIps = new ArrayList<>();
135 Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
136 while (en.hasMoreElements()) {
137 NetworkInterface ni = (NetworkInterface) en.nextElement();
138 Enumeration<InetAddress> ee = ni.getInetAddresses();
139 while (ee.hasMoreElements()) {
140 InetAddress ia = (InetAddress) ee.nextElement();
141 allPossibleIps.add(ia.getHostAddress());
144 } catch (SocketException e) {
145 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
146 }catch(Exception e) {
147 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
149 return allPossibleIps;
153 * This method iterates through all available IP addresses and connects to multiple cassandra
156 private void connectToCassaCluster() {
157 Iterator<String> it = getAllPossibleLocalIps().iterator();
158 String address = "localhost";
159 String[] addresses = null;
160 address = MusicUtil.getMyCassaHost();
161 addresses = address.split(",");
163 logger.info(EELFLoggerDelegate.applicationLogger,
164 "Connecting to cassa cluster: Iterating through possible ips:"
165 + getAllPossibleLocalIps());
166 PoolingOptions poolingOptions = new PoolingOptions();
168 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
169 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
170 while (it.hasNext()) {
172 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
173 logger.info(EELFLoggerDelegate.applicationLogger,
174 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
175 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
176 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
177 //.withLoadBalancingPolicy(new RoundRobinPolicy())
178 .withPoolingOptions(poolingOptions)
179 .addContactPoints(addresses).build();
182 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
183 //.withLoadBalancingPolicy(new RoundRobinPolicy())
184 .addContactPoints(addresses).build();
186 Metadata metadata = cluster.getMetadata();
187 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
188 + metadata.getClusterName() + " at " + address);
189 session = cluster.connect();
192 } catch (NoHostAvailableException e) {
194 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
202 public void close() {
207 * This method connects to cassandra cluster on specific address.
211 private void connectToCassaCluster(String address) throws MusicServiceException {
212 String[] addresses = null;
213 addresses = address.split(",");
214 PoolingOptions poolingOptions = new PoolingOptions();
216 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
217 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
218 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
219 logger.info(EELFLoggerDelegate.applicationLogger,
220 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
221 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
222 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
223 //.withLoadBalancingPolicy(new RoundRobinPolicy())
224 .withPoolingOptions(poolingOptions)
225 .addContactPoints(addresses).build();
228 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
229 //.withLoadBalancingPolicy(new RoundRobinPolicy())
230 .withPoolingOptions(poolingOptions)
231 .addContactPoints(addresses).build();
233 Metadata metadata = cluster.getMetadata();
234 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
235 + metadata.getClusterName() + " at " + address);
237 session = cluster.connect();
238 } catch (Exception ex) {
239 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY, ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE);
240 throw new MusicServiceException(
241 "Error while connecting to Cassandra cluster.. " + ex.getMessage());
252 public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
253 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
254 TableMetadata table = ks.getTable(tableName);
255 return table.getColumn(columnName).getType();
263 * @return TableMetadata
265 public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
266 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
267 return ks.getTable(tableName);
272 * Utility function to return the Java specific object type.
279 public Object getColValue(Row row, String colName, DataType colType) {
281 switch (colType.getName()) {
283 return row.getString(colName);
285 return row.getUUID(colName);
287 return row.getVarint(colName);
289 return row.getLong(colName);
291 return row.getInt(colName);
293 return row.getFloat(colName);
295 return row.getDouble(colName);
297 return row.getBool(colName);
299 return row.getMap(colName, String.class, String.class);
301 return row.getList(colName, String.class);
307 public byte[] getBlobValue(Row row, String colName, DataType colType) {
308 ByteBuffer bb = row.getBytes(colName);
309 byte[] data = bb.array();
313 public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
314 ColumnDefinitions colInfo = row.getColumnDefinitions();
316 for (Map.Entry<String, Object> entry : condition.entrySet()) {
317 String colName = entry.getKey();
318 DataType colType = colInfo.getType(colName);
319 Object columnValue = getColValue(row, colName, colType);
320 Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
321 if (!columnValue.equals(conditionValue))
328 * Utility function to store ResultSet values in to a MAP for output.
333 public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
334 Map<String, HashMap<String, Object>> resultMap =
337 for (Row row : results) {
338 ColumnDefinitions colInfo = row.getColumnDefinitions();
339 HashMap<String, Object> resultOutput = new HashMap<>();
340 for (Definition definition : colInfo) {
341 if (!definition.getName().equals("vector_ts")) {
342 if(definition.getType().toString().toLowerCase().contains("blob")) {
343 resultOutput.put(definition.getName(),
344 getBlobValue(row, definition.getName(), definition.getType()));
347 resultOutput.put(definition.getName(),
348 getColValue(row, definition.getName(), definition.getType()));
351 resultMap.put("row " + counter, resultOutput);
358 // Prepared Statements 1802 additions
360 * This Method performs DDL and DML operations on Cassandra using specified consistency level
362 * @param queryObject Object containing cassandra prepared query and values.
363 * @param consistency Specify consistency level for data synchronization across cassandra
365 * @return Boolean Indicates operation success or failure
366 * @throws MusicServiceException
367 * @throws MusicQueryException
369 public boolean executePut(PreparedQueryObject queryObject, String consistency)
370 throws MusicServiceException, MusicQueryException {
372 boolean result = false;
374 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
375 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
376 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
377 + queryObject.getQuery() + "]");
379 logger.info(EELFLoggerDelegate.applicationLogger,
380 "In preprared Execute Put: the actual insert query:"
381 + queryObject.getQuery() + "; the values"
382 + queryObject.getValues());
383 PreparedStatement preparedInsert = null;
386 preparedInsert = session.prepare(queryObject.getQuery());
388 } catch(InvalidQueryException iqe) {
389 logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
390 throw new MusicQueryException(iqe.getMessage());
391 }catch(Exception e) {
392 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
393 throw new MusicQueryException(e.getMessage());
397 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
398 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
399 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
400 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
401 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
402 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
405 ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
406 result = rs.wasApplied();
409 catch (AlreadyExistsException ae) {
410 logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
411 throw new MusicServiceException(ae.getMessage());
413 catch (Exception e) {
414 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
415 throw new MusicQueryException("Executing Session Failure for Request = " + "["
416 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
424 * This method performs DDL operations on Cassandra using consistency level ONE.
426 * @param queryObject Object containing cassandra prepared query and values.
428 * @throws MusicServiceException
429 * @throws MusicQueryException
431 public ResultSet executeEventualGet(PreparedQueryObject queryObject)
432 throws MusicServiceException, MusicQueryException {
434 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
435 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
436 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
437 + queryObject.getQuery() + "]");
439 logger.info(EELFLoggerDelegate.applicationLogger,
440 "Executing Eventual get query:" + queryObject.getQuery());
442 ResultSet results = null;
444 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
445 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
446 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
448 } catch (Exception ex) {
449 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
450 throw new MusicServiceException(ex.getMessage());
457 * This method performs DDL operation on Cassandra using consistency level QUORUM.
459 * @param queryObject Object containing cassandra prepared query and values.
461 * @throws MusicServiceException
462 * @throws MusicQueryException
464 public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
465 throws MusicServiceException, MusicQueryException {
466 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
467 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
468 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
469 + queryObject.getQuery() + "]");
471 logger.info(EELFLoggerDelegate.applicationLogger,
472 "Executing Critical get query:" + queryObject.getQuery());
473 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
474 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
475 ResultSet results = null;
477 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
478 } catch (Exception ex) {
479 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
480 throw new MusicServiceException(ex.getMessage());