fixed sonar issues in MusicDataStore.java
[music.git] / src / main / java / org / onap / music / datastore / MusicDataStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Modifications Copyright (c) 2018 IBM
8  * ===================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  * 
13  *     http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  *  Unless required by applicable law or agreed to in writing, software
16  *  distributed under the License is distributed on an "AS IS" BASIS,
17  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  *  See the License for the specific language governing permissions and
19  *  limitations under the License.
20  * 
21  * ============LICENSE_END=============================================
22  * ====================================================================
23  */
24 package org.onap.music.datastore;
25
26 import java.net.InetAddress;
27 import java.net.NetworkInterface;
28 import java.net.SocketException;
29 import java.nio.ByteBuffer;
30 import java.util.ArrayList;
31 import java.util.Enumeration;
32 import java.util.HashMap;
33 import java.util.Iterator;
34 import java.util.Map;
35
36 import org.onap.music.eelf.logging.EELFLoggerDelegate;
37 import org.onap.music.eelf.logging.format.AppMessages;
38 import org.onap.music.eelf.logging.format.ErrorSeverity;
39 import org.onap.music.eelf.logging.format.ErrorTypes;
40 import org.onap.music.exceptions.MusicQueryException;
41 import org.onap.music.exceptions.MusicServiceException;
42 import org.onap.music.main.MusicUtil;
43
44 import com.datastax.driver.core.Cluster;
45 import com.datastax.driver.core.ColumnDefinitions;
46 import com.datastax.driver.core.ColumnDefinitions.Definition;
47 import com.datastax.driver.core.ConsistencyLevel;
48 import com.datastax.driver.core.DataType;
49 import com.datastax.driver.core.HostDistance;
50 import com.datastax.driver.core.KeyspaceMetadata;
51 import com.datastax.driver.core.Metadata;
52 import com.datastax.driver.core.PoolingOptions;
53 import com.datastax.driver.core.PreparedStatement;
54 import com.datastax.driver.core.ResultSet;
55 import com.datastax.driver.core.Row;
56 import com.datastax.driver.core.Session;
57 import com.datastax.driver.core.TableMetadata;
58 import com.datastax.driver.core.exceptions.AlreadyExistsException;
59 import com.datastax.driver.core.exceptions.InvalidQueryException;
60 import com.datastax.driver.core.exceptions.NoHostAvailableException;
61
62 /**
63  * @author nelson24
64  *
65  */
66 public class MusicDataStore {
67
68     private Session session;
69     private Cluster cluster;
70
71
72
73     /**
74      * @param session
75      */
76     public void setSession(Session session) {
77         this.session = session;
78     }
79     
80     /**
81      * @param session
82      */
83     public Session getSession() {
84         return session;
85     }
86
87     /**
88      * @param cluster
89      */
90     public void setCluster(Cluster cluster) {
91         this.cluster = cluster;
92     }
93
94
95
96     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
97
98     /**
99      * 
100      */
101     public MusicDataStore() {
102         connectToCassaCluster();
103     }
104
105
106     /**
107      * @param cluster
108      * @param session
109      */
110     public MusicDataStore(Cluster cluster, Session session) {
111         this.session = session;
112         this.cluster = cluster;
113     }
114
115     /**
116      * 
117      * @param remoteIp
118      * @throws MusicServiceException
119      */
120     public MusicDataStore(String remoteIp) {
121         try {
122             connectToCassaCluster(remoteIp);
123         } catch (MusicServiceException e) {
124             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
125         }
126     }
127
128     /**
129      * 
130      * @return
131      */
132     private ArrayList<String> getAllPossibleLocalIps() {
133         ArrayList<String> allPossibleIps = new ArrayList<>();
134         try {
135             Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
136             while (en.hasMoreElements()) {
137                 NetworkInterface ni = (NetworkInterface) en.nextElement();
138                 Enumeration<InetAddress> ee = ni.getInetAddresses();
139                 while (ee.hasMoreElements()) {
140                     InetAddress ia = (InetAddress) ee.nextElement();
141                     allPossibleIps.add(ia.getHostAddress());
142                 }
143             }
144         } catch (SocketException e) {
145             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
146         }catch(Exception e) {
147                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
148         }
149         return allPossibleIps;
150     }
151
152     /**
153      * This method iterates through all available IP addresses and connects to multiple cassandra
154      * clusters.
155      */
156     private void connectToCassaCluster() {
157         Iterator<String> it = getAllPossibleLocalIps().iterator();
158         String address = "localhost";
159         String[] addresses = null;
160         address = MusicUtil.getMyCassaHost();
161                 addresses = address.split(",");
162                 
163         logger.info(EELFLoggerDelegate.applicationLogger,
164                         "Connecting to cassa cluster: Iterating through possible ips:"
165                                         + getAllPossibleLocalIps());
166         PoolingOptions poolingOptions = new PoolingOptions();
167         poolingOptions
168         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
169         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
170         while (it.hasNext()) {
171             try {
172                 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
173                         logger.info(EELFLoggerDelegate.applicationLogger,
174                                         "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
175                         cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
176                                            .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
177                                            //.withLoadBalancingPolicy(new RoundRobinPolicy())
178                                            .withPoolingOptions(poolingOptions)
179                                            .addContactPoints(addresses).build();
180                 }
181                 else
182                         cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
183                                                                 //.withLoadBalancingPolicy(new RoundRobinPolicy())
184                                                                 .addContactPoints(addresses).build();
185                 
186                 Metadata metadata = cluster.getMetadata();
187                 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
188                                 + metadata.getClusterName() + " at " + address);
189                 session = cluster.connect();
190
191                 break;
192             } catch (NoHostAvailableException e) {
193                 address = it.next();
194                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
195             }
196         }
197     }
198
199     /**
200      * 
201      */
202     public void close() {
203         session.close();
204     }
205
206     /**
207      * This method connects to cassandra cluster on specific address.
208      * 
209      * @param address
210      */
211     private void connectToCassaCluster(String address) throws MusicServiceException {
212         String[] addresses = null;
213                 addresses = address.split(",");
214                 PoolingOptions poolingOptions = new PoolingOptions();
215         poolingOptions
216         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
217         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
218         if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
219                 logger.info(EELFLoggerDelegate.applicationLogger,
220                                 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
221                 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
222                            .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
223                            //.withLoadBalancingPolicy(new RoundRobinPolicy())
224                            .withPoolingOptions(poolingOptions)
225                            .addContactPoints(addresses).build();
226         }
227         else {
228                 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
229                                         //.withLoadBalancingPolicy(new RoundRobinPolicy())
230                                         .withPoolingOptions(poolingOptions)
231                                         .addContactPoints(addresses).build();
232         }
233         Metadata metadata = cluster.getMetadata();
234         logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
235                         + metadata.getClusterName() + " at " + address);
236         try {
237             session = cluster.connect();
238         } catch (Exception ex) {
239             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY, ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE);
240             throw new MusicServiceException(
241                             "Error while connecting to Cassandra cluster.. " + ex.getMessage());
242         }
243     }
244
245     /**
246      * 
247      * @param keyspace
248      * @param tableName
249      * @param columnName
250      * @return DataType
251      */
252     public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
253         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
254         TableMetadata table = ks.getTable(tableName);
255         return table.getColumn(columnName).getType();
256
257     }
258
259     /**
260      * 
261      * @param keyspace
262      * @param tableName
263      * @return TableMetadata
264      */
265     public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
266         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
267         return ks.getTable(tableName);
268     }
269
270
271     /**
272      * Utility function to return the Java specific object type.
273      * 
274      * @param row
275      * @param colName
276      * @param colType
277      * @return
278      */
279     public Object getColValue(Row row, String colName, DataType colType) {
280
281         switch (colType.getName()) {
282             case VARCHAR:
283                 return row.getString(colName);
284             case UUID:
285                 return row.getUUID(colName);
286             case VARINT:
287                 return row.getVarint(colName);
288             case BIGINT:
289                 return row.getLong(colName);
290             case INT:
291                 return row.getInt(colName);
292             case FLOAT:
293                 return row.getFloat(colName);
294             case DOUBLE:
295                 return row.getDouble(colName);
296             case BOOLEAN:
297                 return row.getBool(colName);
298             case MAP:
299                 return row.getMap(colName, String.class, String.class);
300             case LIST:
301                 return row.getList(colName, String.class);
302             default:
303                 return null;
304         }
305     }
306     
307     public byte[] getBlobValue(Row row, String colName, DataType colType) {
308         ByteBuffer bb = row.getBytes(colName);
309         byte[] data = bb.array();
310         return data;
311     }
312
313     public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
314         ColumnDefinitions colInfo = row.getColumnDefinitions();
315
316         for (Map.Entry<String, Object> entry : condition.entrySet()) {
317             String colName = entry.getKey();
318             DataType colType = colInfo.getType(colName);
319             Object columnValue = getColValue(row, colName, colType);
320             Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
321             if (!columnValue.equals(conditionValue))
322                 return false;
323         }
324         return true;
325     }
326
327     /**
328      * Utility function to store ResultSet values in to a MAP for output.
329      * 
330      * @param results
331      * @return MAP
332      */
333     public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
334         Map<String, HashMap<String, Object>> resultMap =
335                         new HashMap<>();
336         int counter = 0;
337         for (Row row : results) {
338             ColumnDefinitions colInfo = row.getColumnDefinitions();
339             HashMap<String, Object> resultOutput = new HashMap<>();
340             for (Definition definition : colInfo) {
341                 if (!definition.getName().equals("vector_ts")) {
342                         if(definition.getType().toString().toLowerCase().contains("blob")) {
343                                 resultOutput.put(definition.getName(),
344                                 getBlobValue(row, definition.getName(), definition.getType()));
345                         } 
346                         else
347                                 resultOutput.put(definition.getName(),
348                                     getColValue(row, definition.getName(), definition.getType()));
349                 }
350             }
351             resultMap.put("row " + counter, resultOutput);
352             counter++;
353         }
354         return resultMap;
355     }
356
357
358     // Prepared Statements 1802 additions
359     /**
360      * This Method performs DDL and DML operations on Cassandra using specified consistency level
361      * 
362      * @param queryObject Object containing cassandra prepared query and values.
363      * @param consistency Specify consistency level for data synchronization across cassandra
364      *        replicas
365      * @return Boolean Indicates operation success or failure
366      * @throws MusicServiceException
367      * @throws MusicQueryException
368      */
369     public boolean executePut(PreparedQueryObject queryObject, String consistency)
370                     throws MusicServiceException, MusicQueryException {
371
372         boolean result = false;
373
374         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
375                 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
376             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
377                             + queryObject.getQuery() + "]");
378         }
379         logger.info(EELFLoggerDelegate.applicationLogger,
380                         "In preprared Execute Put: the actual insert query:"
381                                         + queryObject.getQuery() + "; the values"
382                                         + queryObject.getValues());
383         PreparedStatement preparedInsert = null;
384         try {
385                 
386                                 preparedInsert = session.prepare(queryObject.getQuery());
387                         
388         } catch(InvalidQueryException iqe) {
389                 logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
390                 throw new MusicQueryException(iqe.getMessage());
391         }catch(Exception e) {
392                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
393                 throw new MusicQueryException(e.getMessage());
394         }
395         
396         try {
397             if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
398                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
399                 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
400             } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
401                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
402                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
403             }
404
405             ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
406             result = rs.wasApplied();
407
408         }
409         catch (AlreadyExistsException ae) {
410             logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
411                 throw new MusicServiceException(ae.getMessage());
412         }
413         catch (Exception e) {
414                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
415                 throw new MusicQueryException("Executing Session Failure for Request = " + "["
416                             + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
417         }
418
419
420         return result;
421     }
422
423     /**
424      * This method performs DDL operations on Cassandra using consistency level ONE.
425      * 
426      * @param queryObject Object containing cassandra prepared query and values.
427      * @return ResultSet
428      * @throws MusicServiceException
429      * @throws MusicQueryException
430      */
431     public ResultSet executeEventualGet(PreparedQueryObject queryObject)
432                     throws MusicServiceException, MusicQueryException {
433
434         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
435                 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
436                 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
437                             + queryObject.getQuery() + "]");
438         }
439         logger.info(EELFLoggerDelegate.applicationLogger,
440                         "Executing Eventual  get query:" + queryObject.getQuery());
441        
442         ResultSet results = null;
443         try {
444                  PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
445              preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
446              results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
447
448         } catch (Exception ex) {
449                 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
450                 throw new MusicServiceException(ex.getMessage());
451         }
452         return results;
453     }
454
455     /**
456      * 
457      * This method performs DDL operation on Cassandra using consistency level QUORUM.
458      * 
459      * @param queryObject Object containing cassandra prepared query and values.
460      * @return ResultSet
461      * @throws MusicServiceException
462      * @throws MusicQueryException
463      */
464     public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
465                     throws MusicServiceException, MusicQueryException {
466         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
467                 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
468             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
469                             + queryObject.getQuery() + "]");
470         }
471         logger.info(EELFLoggerDelegate.applicationLogger,
472                         "Executing Critical get query:" + queryObject.getQuery());
473         PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
474         preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
475         ResultSet results = null;
476         try {
477             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
478         } catch (Exception ex) {
479                 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
480                 throw new MusicServiceException(ex.getMessage());
481         }
482         return results;
483
484     }
485
486 }