2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * ============LICENSE_END=============================================
20 * ====================================================================
22 package org.onap.music.datastore;
24 import java.net.InetAddress;
25 import java.net.NetworkInterface;
26 import java.net.SocketException;
27 import java.util.ArrayList;
28 import java.util.Enumeration;
29 import java.util.HashMap;
30 import java.util.Iterator;
32 import org.onap.music.eelf.logging.EELFLoggerDelegate;
33 import org.onap.music.exceptions.MusicQueryException;
34 import org.onap.music.exceptions.MusicServiceException;
35 import org.onap.music.main.MusicUtil;
36 import org.onap.music.main.ResultType;
38 import com.datastax.driver.core.Cluster;
39 import com.datastax.driver.core.ColumnDefinitions;
40 import com.datastax.driver.core.ColumnDefinitions.Definition;
41 import com.datastax.driver.core.ConsistencyLevel;
42 import com.datastax.driver.core.DataType;
43 import com.datastax.driver.core.KeyspaceMetadata;
44 import com.datastax.driver.core.Metadata;
45 import com.datastax.driver.core.PreparedStatement;
46 import com.datastax.driver.core.ResultSet;
47 import com.datastax.driver.core.Row;
48 import com.datastax.driver.core.Session;
49 import com.datastax.driver.core.TableMetadata;
50 import com.datastax.driver.core.exceptions.AlreadyExistsException;
51 import com.datastax.driver.core.exceptions.InvalidQueryException;
52 import com.datastax.driver.core.exceptions.NoHostAvailableException;
58 public class MusicDataStore {
60 private Session session;
61 private Cluster cluster;
68 public void setSession(Session session) {
69 this.session = session;
75 public Session getSession() {
82 public void setCluster(Cluster cluster) {
83 this.cluster = cluster;
88 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
93 public MusicDataStore() {
94 connectToCassaCluster();
102 public MusicDataStore(Cluster cluster, Session session) {
103 this.session = session;
104 this.cluster = cluster;
110 * @throws MusicServiceException
112 public MusicDataStore(String remoteIp) {
114 connectToCassaCluster(remoteIp);
115 } catch (MusicServiceException e) {
116 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
124 private ArrayList<String> getAllPossibleLocalIps() {
125 ArrayList<String> allPossibleIps = new ArrayList<String>();
127 Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
128 while (en.hasMoreElements()) {
129 NetworkInterface ni = (NetworkInterface) en.nextElement();
130 Enumeration<InetAddress> ee = ni.getInetAddresses();
131 while (ee.hasMoreElements()) {
132 InetAddress ia = (InetAddress) ee.nextElement();
133 allPossibleIps.add(ia.getHostAddress());
136 } catch (SocketException e) {
137 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
139 return allPossibleIps;
143 * This method iterates through all available IP addresses and connects to multiple cassandra
146 private void connectToCassaCluster() {
147 Iterator<String> it = getAllPossibleLocalIps().iterator();
148 String address = "localhost";
149 logger.info(EELFLoggerDelegate.applicationLogger,
150 "Connecting to cassa cluster: Iterating through possible ips:"
151 + getAllPossibleLocalIps());
152 while (it.hasNext()) {
154 cluster = Cluster.builder().withPort(9042)
155 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
156 .addContactPoint(address).build();
157 Metadata metadata = cluster.getMetadata();
158 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
159 + metadata.getClusterName() + " at " + address);
160 session = cluster.connect();
163 } catch (NoHostAvailableException e) {
165 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
173 public void close() {
178 * This method connects to cassandra cluster on specific address.
182 private void connectToCassaCluster(String address) throws MusicServiceException {
183 cluster = Cluster.builder().withPort(9042)
184 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
185 .addContactPoint(address).build();
186 Metadata metadata = cluster.getMetadata();
187 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
188 + metadata.getClusterName() + " at " + address);
190 session = cluster.connect();
191 } catch (Exception ex) {
192 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
193 throw new MusicServiceException(
194 "Error while connecting to Cassandra cluster.. " + ex.getMessage());
205 public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
206 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
207 TableMetadata table = ks.getTable(tableName);
208 return table.getColumn(columnName).getType();
216 * @return TableMetadata
218 public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
219 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
220 return ks.getTable(tableName);
225 * Utility function to return the Java specific object type.
232 public Object getColValue(Row row, String colName, DataType colType) {
234 switch (colType.getName()) {
236 return row.getString(colName);
238 return row.getUUID(colName);
240 return row.getVarint(colName);
242 return row.getLong(colName);
244 return row.getInt(colName);
246 return row.getFloat(colName);
248 return row.getDouble(colName);
250 return row.getBool(colName);
252 return row.getMap(colName, String.class, String.class);
258 public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
259 ColumnDefinitions colInfo = row.getColumnDefinitions();
261 for (Map.Entry<String, Object> entry : condition.entrySet()) {
262 String colName = entry.getKey();
263 DataType colType = colInfo.getType(colName);
264 Object columnValue = getColValue(row, colName, colType);
265 Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
266 if (columnValue.equals(conditionValue) == false)
273 * Utility function to store ResultSet values in to a MAP for output.
278 public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
279 Map<String, HashMap<String, Object>> resultMap =
280 new HashMap<String, HashMap<String, Object>>();
282 for (Row row : results) {
283 ColumnDefinitions colInfo = row.getColumnDefinitions();
284 HashMap<String, Object> resultOutput = new HashMap<String, Object>();
285 for (Definition definition : colInfo) {
286 if (!definition.getName().equals("vector_ts"))
287 resultOutput.put(definition.getName(),
288 getColValue(row, definition.getName(), definition.getType()));
290 resultMap.put("row " + counter, resultOutput);
297 // Prepared Statements 1802 additions
299 * This Method performs DDL and DML operations on Cassandra using specified consistency level
301 * @param queryObject Object containing cassandra prepared query and values.
302 * @param consistency Specify consistency level for data synchronization across cassandra
304 * @return Boolean Indicates operation success or failure
305 * @throws MusicServiceException
306 * @throws MusicQueryException
308 public boolean executePut(PreparedQueryObject queryObject, String consistency)
309 throws MusicServiceException, MusicQueryException {
311 boolean result = false;
313 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
314 logger.error(EELFLoggerDelegate.errorLogger,
315 "Error while processing prepared query object");
316 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
317 + queryObject.getQuery() + "]");
319 logger.info(EELFLoggerDelegate.applicationLogger,
320 "In preprared Execute Put: the actual insert query:"
321 + queryObject.getQuery() + "; the values"
322 + queryObject.getValues());
323 PreparedStatement preparedInsert = null;
325 preparedInsert = session.prepare(queryObject.getQuery());
326 } catch(InvalidQueryException iqe) {
327 logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage());
328 throw new MusicQueryException(iqe.getMessage());
332 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
333 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
334 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
335 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
336 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
337 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
340 ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
341 result = rs.wasApplied();
344 catch (AlreadyExistsException ae) {
345 logger.error(EELFLoggerDelegate.errorLogger, "Executing Session Failure for Request = "
346 + "[" + queryObject.getQuery() + "]" + " Reason = " + ae.getMessage());
347 throw new MusicServiceException(ae.getMessage());
349 catch (Exception e) {
350 logger.error(EELFLoggerDelegate.errorLogger, "Executing Session Failure for Request = "
351 + "[" + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
352 throw new MusicServiceException("Executing Session Failure for Request = " + "["
353 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
361 * This method performs DDL operations on Cassandra using consistency level ONE.
363 * @param queryObject Object containing cassandra prepared query and values.
365 * @throws MusicServiceException
366 * @throws MusicQueryException
368 public ResultSet executeEventualGet(PreparedQueryObject queryObject)
369 throws MusicServiceException, MusicQueryException {
371 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
372 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
373 + queryObject.getQuery() + "]");
375 logger.info(EELFLoggerDelegate.applicationLogger,
376 "Executing Eventual get query:" + queryObject.getQuery());
377 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
378 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
379 ResultSet results = null;
381 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
383 } catch (Exception ex) {
384 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
385 throw new MusicServiceException(ex.getMessage());
392 * This method performs DDL operation on Cassandra using consistency level QUORUM.
394 * @param queryObject Object containing cassandra prepared query and values.
396 * @throws MusicServiceException
397 * @throws MusicQueryException
399 public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
400 throws MusicServiceException, MusicQueryException {
401 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
402 logger.error(EELFLoggerDelegate.errorLogger, "Error processing Prepared Query Object");
403 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
404 + queryObject.getQuery() + "]");
406 logger.info(EELFLoggerDelegate.applicationLogger,
407 "Executing Critical get query:" + queryObject.getQuery());
408 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
409 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
410 ResultSet results = null;
412 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
413 } catch (Exception ex) {
414 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
415 throw new MusicServiceException(ex.getMessage());