2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * ============LICENSE_END=============================================
20 * ====================================================================
23 package org.onap.music.datastore;
25 import java.net.InetAddress;
26 import java.net.NetworkInterface;
27 import java.net.SocketException;
28 import java.util.ArrayList;
29 import java.util.Enumeration;
30 import java.util.HashMap;
31 import java.util.Iterator;
33 import org.onap.music.eelf.logging.EELFLoggerDelegate;
34 import org.onap.music.eelf.logging.format.AppMessages;
35 import org.onap.music.eelf.logging.format.ErrorSeverity;
36 import org.onap.music.eelf.logging.format.ErrorTypes;
37 import org.onap.music.exceptions.MusicQueryException;
38 import org.onap.music.exceptions.MusicServiceException;
39 import org.onap.music.main.MusicUtil;
40 import com.datastax.driver.core.Cluster;
41 import com.datastax.driver.core.ColumnDefinitions;
42 import com.datastax.driver.core.ColumnDefinitions.Definition;
43 import com.datastax.driver.core.ConsistencyLevel;
44 import com.datastax.driver.core.DataType;
45 import com.datastax.driver.core.KeyspaceMetadata;
46 import com.datastax.driver.core.Metadata;
47 import com.datastax.driver.core.PreparedStatement;
48 import com.datastax.driver.core.ResultSet;
49 import com.datastax.driver.core.Row;
50 import com.datastax.driver.core.Session;
51 import com.datastax.driver.core.TableMetadata;
52 import com.datastax.driver.core.exceptions.AlreadyExistsException;
53 import com.datastax.driver.core.exceptions.InvalidQueryException;
54 import com.datastax.driver.core.exceptions.NoHostAvailableException;
55 import com.datastax.driver.core.policies.RoundRobinPolicy;
56 import com.datastax.driver.core.HostDistance;
57 import com.datastax.driver.core.PoolingOptions;
64 public class MusicDataStore {
66 private Session session;
67 private Cluster cluster;
74 public void setSession(Session session) {
75 this.session = session;
81 public Session getSession() {
88 public void setCluster(Cluster cluster) {
89 this.cluster = cluster;
94 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
99 public MusicDataStore() {
100 connectToCassaCluster();
108 public MusicDataStore(Cluster cluster, Session session) {
109 this.session = session;
110 this.cluster = cluster;
116 * @throws MusicServiceException
118 public MusicDataStore(String remoteIp) {
120 connectToCassaCluster(remoteIp);
121 } catch (MusicServiceException e) {
122 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
130 private ArrayList<String> getAllPossibleLocalIps() {
131 ArrayList<String> allPossibleIps = new ArrayList<String>();
133 Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
134 while (en.hasMoreElements()) {
135 NetworkInterface ni = (NetworkInterface) en.nextElement();
136 Enumeration<InetAddress> ee = ni.getInetAddresses();
137 while (ee.hasMoreElements()) {
138 InetAddress ia = (InetAddress) ee.nextElement();
139 allPossibleIps.add(ia.getHostAddress());
142 } catch (SocketException e) {
143 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
144 }catch(Exception e) {
145 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
147 return allPossibleIps;
151 * This method iterates through all available IP addresses and connects to multiple cassandra
154 private void connectToCassaCluster() {
155 Iterator<String> it = getAllPossibleLocalIps().iterator();
156 String address = "localhost";
157 String[] addresses = null;
158 address = MusicUtil.getMyCassaHost();
159 addresses = address.split(",");
161 logger.info(EELFLoggerDelegate.applicationLogger,
162 "Connecting to cassa cluster: Iterating through possible ips:"
163 + getAllPossibleLocalIps());
164 PoolingOptions poolingOptions = new PoolingOptions();
166 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
167 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
168 while (it.hasNext()) {
170 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
171 logger.info(EELFLoggerDelegate.applicationLogger,
172 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
173 cluster = Cluster.builder().withPort(9042)
174 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
175 //.withLoadBalancingPolicy(new RoundRobinPolicy())
176 .withPoolingOptions(poolingOptions)
177 .addContactPoints(addresses).build();
180 cluster = Cluster.builder().withPort(9042)
181 //.withLoadBalancingPolicy(new RoundRobinPolicy())
182 .addContactPoints(addresses).build();
184 Metadata metadata = cluster.getMetadata();
185 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
186 + metadata.getClusterName() + " at " + address);
187 session = cluster.connect();
190 } catch (NoHostAvailableException e) {
192 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
200 public void close() {
205 * This method connects to cassandra cluster on specific address.
209 private void connectToCassaCluster(String address) throws MusicServiceException {
210 String[] addresses = null;
211 addresses = address.split(",");
212 PoolingOptions poolingOptions = new PoolingOptions();
214 .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
215 .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
216 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
217 logger.info(EELFLoggerDelegate.applicationLogger,
218 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
219 cluster = Cluster.builder().withPort(9042)
220 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
221 //.withLoadBalancingPolicy(new RoundRobinPolicy())
222 .withPoolingOptions(poolingOptions)
223 .addContactPoints(addresses).build();
226 cluster = Cluster.builder().withPort(9042)
227 //.withLoadBalancingPolicy(new RoundRobinPolicy())
228 .withPoolingOptions(poolingOptions)
229 .addContactPoints(addresses).build();
231 Metadata metadata = cluster.getMetadata();
232 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
233 + metadata.getClusterName() + " at " + address);
235 session = cluster.connect();
236 } catch (Exception ex) {
237 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY, ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE);
238 throw new MusicServiceException(
239 "Error while connecting to Cassandra cluster.. " + ex.getMessage());
250 public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
251 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
252 TableMetadata table = ks.getTable(tableName);
253 return table.getColumn(columnName).getType();
261 * @return TableMetadata
263 public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
264 KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
265 return ks.getTable(tableName);
270 * Utility function to return the Java specific object type.
277 public Object getColValue(Row row, String colName, DataType colType) {
279 switch (colType.getName()) {
281 return row.getString(colName);
283 return row.getUUID(colName);
285 return row.getVarint(colName);
287 return row.getLong(colName);
289 return row.getInt(colName);
291 return row.getFloat(colName);
293 return row.getDouble(colName);
295 return row.getBool(colName);
297 return row.getMap(colName, String.class, String.class);
303 public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
304 ColumnDefinitions colInfo = row.getColumnDefinitions();
306 for (Map.Entry<String, Object> entry : condition.entrySet()) {
307 String colName = entry.getKey();
308 DataType colType = colInfo.getType(colName);
309 Object columnValue = getColValue(row, colName, colType);
310 Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
311 if (columnValue.equals(conditionValue) == false)
318 * Utility function to store ResultSet values in to a MAP for output.
323 public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
324 Map<String, HashMap<String, Object>> resultMap =
325 new HashMap<String, HashMap<String, Object>>();
327 for (Row row : results) {
328 ColumnDefinitions colInfo = row.getColumnDefinitions();
329 HashMap<String, Object> resultOutput = new HashMap<String, Object>();
330 for (Definition definition : colInfo) {
331 if (!definition.getName().equals("vector_ts"))
332 resultOutput.put(definition.getName(),
333 getColValue(row, definition.getName(), definition.getType()));
335 resultMap.put("row " + counter, resultOutput);
342 // Prepared Statements 1802 additions
344 * This Method performs DDL and DML operations on Cassandra using specified consistency level
346 * @param queryObject Object containing cassandra prepared query and values.
347 * @param consistency Specify consistency level for data synchronization across cassandra
349 * @return Boolean Indicates operation success or failure
350 * @throws MusicServiceException
351 * @throws MusicQueryException
353 public boolean executePut(PreparedQueryObject queryObject, String consistency)
354 throws MusicServiceException, MusicQueryException {
356 boolean result = false;
358 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
359 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
360 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
361 + queryObject.getQuery() + "]");
363 logger.info(EELFLoggerDelegate.applicationLogger,
364 "In preprared Execute Put: the actual insert query:"
365 + queryObject.getQuery() + "; the values"
366 + queryObject.getValues());
367 PreparedStatement preparedInsert = null;
370 preparedInsert = session.prepare(queryObject.getQuery());
372 } catch(InvalidQueryException iqe) {
373 logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
374 throw new MusicQueryException(iqe.getMessage());
375 }catch(Exception e) {
376 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
377 throw new MusicQueryException(e.getMessage());
381 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
382 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
383 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
384 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
385 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
386 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
389 ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
390 result = rs.wasApplied();
393 catch (AlreadyExistsException ae) {
394 logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
395 throw new MusicServiceException(ae.getMessage());
397 catch (Exception e) {
398 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
399 throw new MusicQueryException("Executing Session Failure for Request = " + "["
400 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
408 * This method performs DDL operations on Cassandra using consistency level ONE.
410 * @param queryObject Object containing cassandra prepared query and values.
412 * @throws MusicServiceException
413 * @throws MusicQueryException
415 public ResultSet executeEventualGet(PreparedQueryObject queryObject)
416 throws MusicServiceException, MusicQueryException {
418 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
419 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
420 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
421 + queryObject.getQuery() + "]");
423 logger.info(EELFLoggerDelegate.applicationLogger,
424 "Executing Eventual get query:" + queryObject.getQuery());
426 ResultSet results = null;
428 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
429 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
430 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
432 } catch (Exception ex) {
433 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
434 throw new MusicServiceException(ex.getMessage());
441 * This method performs DDL operation on Cassandra using consistency level QUORUM.
443 * @param queryObject Object containing cassandra prepared query and values.
445 * @throws MusicServiceException
446 * @throws MusicQueryException
448 public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
449 throws MusicServiceException, MusicQueryException {
450 if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
451 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
452 throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
453 + queryObject.getQuery() + "]");
455 logger.info(EELFLoggerDelegate.applicationLogger,
456 "Executing Critical get query:" + queryObject.getQuery());
457 PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
458 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
459 ResultSet results = null;
461 results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
462 } catch (Exception ex) {
463 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
464 throw new MusicServiceException(ex.getMessage());