2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * ============LICENSE_END=============================================
20 * ====================================================================
22 package org.onap.music.datastore;
24 import java.net.InetAddress;
25 import java.net.NetworkInterface;
26 import java.net.SocketException;
27 import java.util.ArrayList;
28 import java.util.Enumeration;
29 import java.util.HashMap;
30 import java.util.Iterator;
32 import org.onap.music.exceptions.MusicQueryException;
33 import org.onap.music.exceptions.MusicServiceException;
34 import org.onap.music.main.MusicUtil;
35 import com.att.eelf.configuration.EELFLogger;
36 import com.att.eelf.configuration.EELFManager;
37 import com.datastax.driver.core.Cluster;
38 import com.datastax.driver.core.ColumnDefinitions;
39 import com.datastax.driver.core.ColumnDefinitions.Definition;
40 import com.datastax.driver.core.ConsistencyLevel;
41 import com.datastax.driver.core.DataType;
42 import com.datastax.driver.core.KeyspaceMetadata;
43 import com.datastax.driver.core.Metadata;
44 import com.datastax.driver.core.PreparedStatement;
45 import com.datastax.driver.core.ResultSet;
46 import com.datastax.driver.core.Row;
47 import com.datastax.driver.core.Session;
48 import com.datastax.driver.core.TableMetadata;
49 import com.datastax.driver.core.exceptions.NoHostAvailableException;
55 public class MusicDataStore {
56 private Session session;
57 private Cluster cluster;
64 public void setSession(Session session) {
65 this.session = session;
71 public void setCluster(Cluster cluster) {
72 this.cluster = cluster;
78 private static EELFLogger logger = EELFManager.getInstance().getLogger(MusicDataStore.class);
83 public MusicDataStore() {
84 connectToCassaCluster();
92 public MusicDataStore(Cluster cluster, Session session) {
93 this.session = session;
94 this.cluster = cluster;
100 * @throws MusicServiceException
102 public MusicDataStore(String remoteIp) {
104 connectToCassaCluster(remoteIp);
105 } catch (MusicServiceException e) {
106 logger.error(e.getMessage());
114 private ArrayList<String> getAllPossibleLocalIps() {
115 ArrayList<String> allPossibleIps = new ArrayList<String>();
117 Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
118 while (en.hasMoreElements()) {
119 NetworkInterface ni = (NetworkInterface) en.nextElement();
120 Enumeration<InetAddress> ee = ni.getInetAddresses();
121 while (ee.hasMoreElements()) {
122 InetAddress ia = (InetAddress) ee.nextElement();
123 allPossibleIps.add(ia.getHostAddress());
126 } catch (SocketException e) {
127 logger.error(e.getMessage());
129 return allPossibleIps;
133 * This method iterates through all available IP addresses and connects to multiple cassandra
136 private void connectToCassaCluster() {
137 Iterator<String> it = getAllPossibleLocalIps().iterator();
138 String address = "localhost";
139 logger.info("Connecting to cassa cluster: Iterating through possible ips:"
140 + getAllPossibleLocalIps());
141 while (it.hasNext()) {
143 cluster = Cluster.builder().withPort(9042)
144 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
145 .addContactPoint(address).build();
146 Metadata metadata = cluster.getMetadata();
147 logger.info("Connected to cassa cluster " + metadata.getClusterName() + " at "
149 session = cluster.connect();
152 } catch (NoHostAvailableException e) {
154 logger.error(e.getMessage());
162 public void close() {
167 * This method connects to cassandra cluster on specific address.
171 private void connectToCassaCluster(String address) throws MusicServiceException {
172 cluster = Cluster.builder().withPort(9042)
173 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
174 .addContactPoint(address).build();
175 Metadata metadata = cluster.getMetadata();
176 logger.info("Connected to cassa cluster " + metadata.getClusterName() + " at " + address);
178 session = cluster.connect();
179 } catch (Exception ex) {
180 logger.error(ex.getMessage());
181 throw new MusicServiceException(
182 "Error while connecting to Cassandra cluster.. " + ex.getMessage());
193 public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
194 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
195 TableMetadata table = ks.getTable(tableName);
196 return table.getColumn(columnName).getType();
204 * @return TableMetadata
206 public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
207 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
208 return ks.getTable(tableName);
213 * Utility function to return the Java specific object type.
220 public Object getColValue(Row row, String colName, DataType colType) {
222 switch (colType.getName()) {
224 return row.getString(colName);
226 return row.getUUID(colName);
228 return row.getVarint(colName);
230 return row.getLong(colName);
232 return row.getInt(colName);
234 return row.getFloat(colName);
236 return row.getDouble(colName);
238 return row.getBool(colName);
240 return row.getMap(colName, String.class, String.class);
246 public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) {
247 ColumnDefinitions colInfo = row.getColumnDefinitions();
249 for (Map.Entry<String, Object> entry : condition.entrySet()) {
250 String colName = entry.getKey();
251 DataType colType = colInfo.getType(colName);
252 Object columnValue = getColValue(row, colName, colType);
253 Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
254 if (columnValue.equals(conditionValue) == false)
261 * Utility function to store ResultSet values in to a MAP for output.
266 public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
267 Map<String, HashMap<String, Object>> resultMap =
268 new HashMap<String, HashMap<String, Object>>();
270 for (Row row : results) {
271 ColumnDefinitions colInfo = row.getColumnDefinitions();
272 HashMap<String, Object> resultOutput = new HashMap<String, Object>();
273 for (Definition definition : colInfo) {
274 if (!definition.getName().equals("vector_ts"))
275 resultOutput.put(definition.getName(),
276 getColValue(row, definition.getName(), definition.getType()));
278 resultMap.put("row " + counter, resultOutput);
285 // Prepared Statements 1802 additions
287 * This Method performs DDL and DML operations on Cassandra using specified consistency level
289 * @param queryObject Object containing cassandra prepared query and values.
290 * @param consistency Specify consistency level for data synchronization across cassandra
292 * @return Boolean Indicates operation success or failure
293 * @throws MusicServiceException
294 * @throws MusicQueryException
296 public boolean executePut(PreparedQueryObject queryObject, String consistency)
297 throws MusicServiceException, MusicQueryException {
299 boolean result = false;
301 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
302 logger.error("Error while processing prepared query object");
303 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
304 + queryObject.getQuery() + "]");
306 logger.info("In preprared Execute Put: the actual insert query:" + queryObject.getQuery()
307 + "; the values" + queryObject.getValues());
308 PreparedStatement preparedInsert = session.prepare(queryObject.getQuery());
310 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
311 logger.info("Executing critical put query");
312 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
313 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
314 logger.info("Executing simple put query");
315 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
318 session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
320 } catch (Exception e) {
321 logger.error("Executing Session Failure for Request = " + "[" + queryObject.getQuery()
322 + "]" + " Reason = " + e.getMessage());
323 throw new MusicServiceException("Executing Session Failure for Request = " + "["
324 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
332 * This method performs DDL operations on Cassandra using consistency level ONE.
334 * @param queryObject Object containing cassandra prepared query and values.
336 * @throws MusicServiceException
337 * @throws MusicQueryException
339 public ResultSet executeEventualGet(PreparedQueryObject queryObject)
340 throws MusicServiceException, MusicQueryException {
342 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
343 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
344 + queryObject.getQuery() + "]");
346 logger.info("Executing Eventual get query:" + queryObject.getQuery());
347 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
348 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
349 ResultSet results = null;
351 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
353 } catch (Exception ex) {
354 logger.error(ex.getMessage());
355 throw new MusicServiceException(ex.getMessage());
362 * This method performs DDL operation on Cassandra using consistency level QUORUM.
364 * @param queryObject Object containing cassandra prepared query and values.
366 * @throws MusicServiceException
367 * @throws MusicQueryException
369 public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
370 throws MusicServiceException, MusicQueryException {
371 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
372 logger.error("Error processing Prepared Query Object");
373 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
374 + queryObject.getQuery() + "]");
376 logger.info("Executing Critical get query:" + queryObject.getQuery());
377 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
378 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
379 ResultSet results = null;
381 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
382 } catch (Exception ex) {
383 logger.error(ex.getMessage());
384 throw new MusicServiceException(ex.getMessage());