2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * ============LICENSE_END=============================================
20 * ====================================================================
22 package org.onap.music.datastore;
24 import java.net.InetAddress;
25 import java.net.NetworkInterface;
26 import java.net.SocketException;
27 import java.util.ArrayList;
28 import java.util.Enumeration;
29 import java.util.HashMap;
30 import java.util.Iterator;
32 import org.onap.music.eelf.logging.EELFLoggerDelegate;
33 import org.onap.music.eelf.logging.format.AppMessages;
34 import org.onap.music.eelf.logging.format.ErrorSeverity;
35 import org.onap.music.eelf.logging.format.ErrorTypes;
36 import org.onap.music.exceptions.MusicQueryException;
37 import org.onap.music.exceptions.MusicServiceException;
38 import org.onap.music.main.MusicUtil;
39 import com.datastax.driver.core.Cluster;
40 import com.datastax.driver.core.ColumnDefinitions;
41 import com.datastax.driver.core.ColumnDefinitions.Definition;
42 import com.datastax.driver.core.ConsistencyLevel;
43 import com.datastax.driver.core.DataType;
44 import com.datastax.driver.core.KeyspaceMetadata;
45 import com.datastax.driver.core.Metadata;
46 import com.datastax.driver.core.PreparedStatement;
47 import com.datastax.driver.core.ResultSet;
48 import com.datastax.driver.core.Row;
49 import com.datastax.driver.core.Session;
50 import com.datastax.driver.core.TableMetadata;
51 import com.datastax.driver.core.exceptions.AlreadyExistsException;
52 import com.datastax.driver.core.exceptions.InvalidQueryException;
53 import com.datastax.driver.core.exceptions.NoHostAvailableException;
59 public class MusicDataStore {
61 private Session session;
62 private Cluster cluster;
69 public void setSession(Session session) {
70 this.session = session;
76 public Session getSession() {
83 public void setCluster(Cluster cluster) {
84 this.cluster = cluster;
89 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
94 public MusicDataStore() {
95 connectToCassaCluster();
103 public MusicDataStore(Cluster cluster, Session session) {
104 this.session = session;
105 this.cluster = cluster;
111 * @throws MusicServiceException
113 public MusicDataStore(String remoteIp) {
115 connectToCassaCluster(remoteIp);
116 } catch (MusicServiceException e) {
117 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
125 private ArrayList<String> getAllPossibleLocalIps() {
126 ArrayList<String> allPossibleIps = new ArrayList<String>();
128 Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
129 while (en.hasMoreElements()) {
130 NetworkInterface ni = (NetworkInterface) en.nextElement();
131 Enumeration<InetAddress> ee = ni.getInetAddresses();
132 while (ee.hasMoreElements()) {
133 InetAddress ia = (InetAddress) ee.nextElement();
134 allPossibleIps.add(ia.getHostAddress());
137 } catch (SocketException e) {
138 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
139 }catch(Exception e) {
140 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
142 return allPossibleIps;
146 * This method iterates through all available IP addresses and connects to multiple cassandra
149 private void connectToCassaCluster() {
150 Iterator<String> it = getAllPossibleLocalIps().iterator();
151 String address = "localhost";
152 logger.info(EELFLoggerDelegate.applicationLogger,
153 "Connecting to cassa cluster: Iterating through possible ips:"
154 + getAllPossibleLocalIps());
155 while (it.hasNext()) {
157 cluster = Cluster.builder().withPort(9042)
158 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
159 .addContactPoint(address).build();
160 Metadata metadata = cluster.getMetadata();
161 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
162 + metadata.getClusterName() + " at " + address);
163 session = cluster.connect();
166 } catch (NoHostAvailableException e) {
168 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
176 public void close() {
181 * This method connects to cassandra cluster on specific address.
185 private void connectToCassaCluster(String address) throws MusicServiceException {
186 cluster = Cluster.builder().withPort(9042)
187 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
188 .addContactPoint(address).build();
189 Metadata metadata = cluster.getMetadata();
190 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
191 + metadata.getClusterName() + " at " + address);
193 session = cluster.connect();
194 } catch (Exception ex) {
195 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY, ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE);
196 throw new MusicServiceException(
197 "Error while connecting to Cassandra cluster.. " + ex.getMessage());
208 public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
209 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
210 TableMetadata table = ks.getTable(tableName);
211 return table.getColumn(columnName).getType();
219 * @return TableMetadata
221 public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
222 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
223 return ks.getTable(tableName);
228 * Utility function to return the Java specific object type.
235 public Object getColValue(Row row, String colName, DataType colType) {
237 switch (colType.getName()) {
239 return row.getString(colName);
241 return row.getUUID(colName);
243 return row.getVarint(colName);
245 return row.getLong(colName);
247 return row.getInt(colName);
249 return row.getFloat(colName);
251 return row.getDouble(colName);
253 return row.getBool(colName);
255 return row.getMap(colName, String.class, String.class);
257 return row.getList(colName, String.class);
263 public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
264 ColumnDefinitions colInfo = row.getColumnDefinitions();
266 for (Map.Entry<String, Object> entry : condition.entrySet()) {
267 String colName = entry.getKey();
268 DataType colType = colInfo.getType(colName);
269 Object columnValue = getColValue(row, colName, colType);
270 Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
271 if (columnValue.equals(conditionValue) == false)
278 * Utility function to store ResultSet values in to a MAP for output.
283 public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
284 Map<String, HashMap<String, Object>> resultMap =
285 new HashMap<String, HashMap<String, Object>>();
287 for (Row row : results) {
288 ColumnDefinitions colInfo = row.getColumnDefinitions();
289 HashMap<String, Object> resultOutput = new HashMap<String, Object>();
290 for (Definition definition : colInfo) {
291 if (!definition.getName().equals("vector_ts"))
292 resultOutput.put(definition.getName(),
293 getColValue(row, definition.getName(), definition.getType()));
295 resultMap.put("row " + counter, resultOutput);
302 // Prepared Statements 1802 additions
304 * This Method performs DDL and DML operations on Cassandra using specified consistency level
306 * @param queryObject Object containing cassandra prepared query and values.
307 * @param consistency Specify consistency level for data synchronization across cassandra
309 * @return Boolean Indicates operation success or failure
310 * @throws MusicServiceException
311 * @throws MusicQueryException
313 public boolean executePut(PreparedQueryObject queryObject, String consistency)
314 throws MusicServiceException, MusicQueryException {
316 boolean result = false;
318 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
319 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
320 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
321 + queryObject.getQuery() + "]");
323 logger.info(EELFLoggerDelegate.applicationLogger,
324 "In preprared Execute Put: the actual insert query:"
325 + queryObject.getQuery() + "; the values"
326 + queryObject.getValues());
327 PreparedStatement preparedInsert = null;
329 preparedInsert = session.prepare(queryObject.getQuery());
330 } catch(InvalidQueryException iqe) {
331 logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage());
332 throw new MusicQueryException(iqe.getMessage());
336 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
337 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
338 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
339 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
340 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
341 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
344 ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
345 result = rs.wasApplied();
348 catch (AlreadyExistsException ae) {
349 logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
350 throw new MusicServiceException(ae.getMessage());
352 catch (Exception e) {
353 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
354 throw new MusicQueryException("Executing Session Failure for Request = " + "["
355 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
363 * This method performs DDL operations on Cassandra using consistency level ONE.
365 * @param queryObject Object containing cassandra prepared query and values.
367 * @throws MusicServiceException
368 * @throws MusicQueryException
370 public ResultSet executeEventualGet(PreparedQueryObject queryObject)
371 throws MusicServiceException, MusicQueryException {
373 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
374 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
375 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
376 + queryObject.getQuery() + "]");
378 logger.info(EELFLoggerDelegate.applicationLogger,
379 "Executing Eventual get query:" + queryObject.getQuery());
381 ResultSet results = null;
383 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
384 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
385 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
387 } catch (Exception ex) {
388 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
389 throw new MusicServiceException(ex.getMessage());
396 * This method performs DDL operation on Cassandra using consistency level QUORUM.
398 * @param queryObject Object containing cassandra prepared query and values.
400 * @throws MusicServiceException
401 * @throws MusicQueryException
403 public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
404 throws MusicServiceException, MusicQueryException {
405 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
406 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
407 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
408 + queryObject.getQuery() + "]");
410 logger.info(EELFLoggerDelegate.applicationLogger,
411 "Executing Critical get query:" + queryObject.getQuery());
412 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
413 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
414 ResultSet results = null;
416 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
417 } catch (Exception ex) {
418 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
419 throw new MusicServiceException(ex.getMessage());