2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * ============LICENSE_END=============================================
20 * ====================================================================
22 package org.onap.music.datastore;
24 import java.net.InetAddress;
25 import java.net.NetworkInterface;
26 import java.net.SocketException;
27 import java.util.ArrayList;
28 import java.util.Enumeration;
29 import java.util.HashMap;
30 import java.util.Iterator;
32 import org.onap.music.eelf.logging.EELFLoggerDelegate;
33 import org.onap.music.exceptions.MusicQueryException;
34 import org.onap.music.exceptions.MusicServiceException;
35 import org.onap.music.main.MusicUtil;
36 import com.datastax.driver.core.Cluster;
37 import com.datastax.driver.core.ColumnDefinitions;
38 import com.datastax.driver.core.ColumnDefinitions.Definition;
39 import com.datastax.driver.core.ConsistencyLevel;
40 import com.datastax.driver.core.DataType;
41 import com.datastax.driver.core.KeyspaceMetadata;
42 import com.datastax.driver.core.Metadata;
43 import com.datastax.driver.core.PreparedStatement;
44 import com.datastax.driver.core.ResultSet;
45 import com.datastax.driver.core.Row;
46 import com.datastax.driver.core.Session;
47 import com.datastax.driver.core.TableMetadata;
48 import com.datastax.driver.core.exceptions.NoHostAvailableException;
54 public class MusicDataStore {
56 private Session session;
57 private Cluster cluster;
64 public void setSession(Session session) {
65 this.session = session;
71 public void setCluster(Cluster cluster) {
72 this.cluster = cluster;
77 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
82 public MusicDataStore() {
83 connectToCassaCluster();
91 public MusicDataStore(Cluster cluster, Session session) {
92 this.session = session;
93 this.cluster = cluster;
99 * @throws MusicServiceException
101 public MusicDataStore(String remoteIp) {
103 connectToCassaCluster(remoteIp);
104 } catch (MusicServiceException e) {
105 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
113 private ArrayList<String> getAllPossibleLocalIps() {
114 ArrayList<String> allPossibleIps = new ArrayList<String>();
116 Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
117 while (en.hasMoreElements()) {
118 NetworkInterface ni = (NetworkInterface) en.nextElement();
119 Enumeration<InetAddress> ee = ni.getInetAddresses();
120 while (ee.hasMoreElements()) {
121 InetAddress ia = (InetAddress) ee.nextElement();
122 allPossibleIps.add(ia.getHostAddress());
125 } catch (SocketException e) {
126 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
128 return allPossibleIps;
132 * This method iterates through all available IP addresses and connects to multiple cassandra
135 private void connectToCassaCluster() {
136 Iterator<String> it = getAllPossibleLocalIps().iterator();
137 String address = "localhost";
138 logger.info(EELFLoggerDelegate.applicationLogger,
139 "Connecting to cassa cluster: Iterating through possible ips:"
140 + getAllPossibleLocalIps());
141 while (it.hasNext()) {
143 cluster = Cluster.builder().withPort(9042)
144 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
145 .addContactPoint(address).build();
146 Metadata metadata = cluster.getMetadata();
147 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
148 + metadata.getClusterName() + " at " + address);
149 session = cluster.connect();
152 } catch (NoHostAvailableException e) {
154 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
162 public void close() {
167 * This method connects to cassandra cluster on specific address.
171 private void connectToCassaCluster(String address) throws MusicServiceException {
172 cluster = Cluster.builder().withPort(9042)
173 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
174 .addContactPoint(address).build();
175 Metadata metadata = cluster.getMetadata();
176 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
177 + metadata.getClusterName() + " at " + address);
179 session = cluster.connect();
180 } catch (Exception ex) {
181 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
182 throw new MusicServiceException(
183 "Error while connecting to Cassandra cluster.. " + ex.getMessage());
194 public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
195 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
196 TableMetadata table = ks.getTable(tableName);
197 return table.getColumn(columnName).getType();
205 * @return TableMetadata
207 public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
208 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
209 return ks.getTable(tableName);
214 * Utility function to return the Java specific object type.
221 public Object getColValue(Row row, String colName, DataType colType) {
223 switch (colType.getName()) {
225 return row.getString(colName);
227 return row.getUUID(colName);
229 return row.getVarint(colName);
231 return row.getLong(colName);
233 return row.getInt(colName);
235 return row.getFloat(colName);
237 return row.getDouble(colName);
239 return row.getBool(colName);
241 return row.getMap(colName, String.class, String.class);
247 public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) {
248 ColumnDefinitions colInfo = row.getColumnDefinitions();
250 for (Map.Entry<String, Object> entry : condition.entrySet()) {
251 String colName = entry.getKey();
252 DataType colType = colInfo.getType(colName);
253 Object columnValue = getColValue(row, colName, colType);
254 Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
255 if (columnValue.equals(conditionValue) == false)
262 * Utility function to store ResultSet values in to a MAP for output.
267 public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
268 Map<String, HashMap<String, Object>> resultMap =
269 new HashMap<String, HashMap<String, Object>>();
271 for (Row row : results) {
272 ColumnDefinitions colInfo = row.getColumnDefinitions();
273 HashMap<String, Object> resultOutput = new HashMap<String, Object>();
274 for (Definition definition : colInfo) {
275 if (!definition.getName().equals("vector_ts"))
276 resultOutput.put(definition.getName(),
277 getColValue(row, definition.getName(), definition.getType()));
279 resultMap.put("row " + counter, resultOutput);
286 // Prepared Statements 1802 additions
288 * This Method performs DDL and DML operations on Cassandra using specified consistency level
290 * @param queryObject Object containing cassandra prepared query and values.
291 * @param consistency Specify consistency level for data synchronization across cassandra
293 * @return Boolean Indicates operation success or failure
294 * @throws MusicServiceException
295 * @throws MusicQueryException
297 public boolean executePut(PreparedQueryObject queryObject, String consistency)
298 throws MusicServiceException, MusicQueryException {
300 boolean result = false;
302 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
303 logger.error(EELFLoggerDelegate.errorLogger,
304 "Error while processing prepared query object");
305 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
306 + queryObject.getQuery() + "]");
308 logger.info(EELFLoggerDelegate.applicationLogger,
309 "In preprared Execute Put: the actual insert query:"
310 + queryObject.getQuery() + "; the values"
311 + queryObject.getValues());
312 PreparedStatement preparedInsert = session.prepare(queryObject.getQuery());
314 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
315 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
316 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
317 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
318 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
319 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
322 ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
323 result = rs.wasApplied();
325 } catch (Exception e) {
326 logger.error(EELFLoggerDelegate.errorLogger, "Executing Session Failure for Request = "
327 + "[" + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
328 throw new MusicServiceException("Executing Session Failure for Request = " + "["
329 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
337 * This method performs DDL operations on Cassandra using consistency level ONE.
339 * @param queryObject Object containing cassandra prepared query and values.
341 * @throws MusicServiceException
342 * @throws MusicQueryException
344 public ResultSet executeEventualGet(PreparedQueryObject queryObject)
345 throws MusicServiceException, MusicQueryException {
347 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
348 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
349 + queryObject.getQuery() + "]");
351 logger.info(EELFLoggerDelegate.applicationLogger,
352 "Executing Eventual get query:" + queryObject.getQuery());
353 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
354 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
355 ResultSet results = null;
357 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
359 } catch (Exception ex) {
360 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
361 throw new MusicServiceException(ex.getMessage());
368 * This method performs DDL operation on Cassandra using consistency level QUORUM.
370 * @param queryObject Object containing cassandra prepared query and values.
372 * @throws MusicServiceException
373 * @throws MusicQueryException
375 public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
376 throws MusicServiceException, MusicQueryException {
377 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
378 logger.error(EELFLoggerDelegate.errorLogger, "Error processing Prepared Query Object");
379 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
380 + queryObject.getQuery() + "]");
382 logger.info(EELFLoggerDelegate.applicationLogger,
383 "Executing Critical get query:" + queryObject.getQuery());
384 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
385 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
386 ResultSet results = null;
388 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
389 } catch (Exception ex) {
390 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
391 throw new MusicServiceException(ex.getMessage());