Merge "Fixed sonar issue in JsonSelect"
[music.git] / src / main / java / org / onap / music / datastore / MusicDataStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Modifications Copyright (c) 2018 IBM
8  * ===================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  * 
13  *     http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  *  Unless required by applicable law or agreed to in writing, software
16  *  distributed under the License is distributed on an "AS IS" BASIS,
17  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  *  See the License for the specific language governing permissions and
19  *  limitations under the License.
20  * 
21  * ============LICENSE_END=============================================
22  * ====================================================================
23  */
24 package org.onap.music.datastore;
25
26 import java.net.InetAddress;
27 import java.net.NetworkInterface;
28 import java.net.SocketException;
29 import java.nio.ByteBuffer;
30 import java.util.ArrayList;
31 import java.util.Enumeration;
32 import java.util.HashMap;
33 import java.util.Iterator;
34 import java.util.Map;
35
36 import org.onap.music.eelf.logging.EELFLoggerDelegate;
37 import org.onap.music.eelf.logging.format.AppMessages;
38 import org.onap.music.eelf.logging.format.ErrorSeverity;
39 import org.onap.music.eelf.logging.format.ErrorTypes;
40 import org.onap.music.exceptions.MusicQueryException;
41 import org.onap.music.exceptions.MusicServiceException;
42 import org.onap.music.main.MusicUtil;
43
44 import com.datastax.driver.core.Cluster;
45 import com.datastax.driver.core.ColumnDefinitions;
46 import com.datastax.driver.core.ColumnDefinitions.Definition;
47 import com.datastax.driver.core.ConsistencyLevel;
48 import com.datastax.driver.core.DataType;
49 import com.datastax.driver.core.HostDistance;
50 import com.datastax.driver.core.KeyspaceMetadata;
51 import com.datastax.driver.core.Metadata;
52 import com.datastax.driver.core.PoolingOptions;
53 import com.datastax.driver.core.PreparedStatement;
54 import com.datastax.driver.core.ResultSet;
55 import com.datastax.driver.core.Row;
56 import com.datastax.driver.core.Session;
57 import com.datastax.driver.core.TableMetadata;
58 import com.datastax.driver.core.exceptions.AlreadyExistsException;
59 import com.datastax.driver.core.exceptions.InvalidQueryException;
60 import com.datastax.driver.core.exceptions.NoHostAvailableException;
61
62 /**
63  * @author nelson24
64  *
65  */
66 public class MusicDataStore {
67
68     private Session session;
69     private Cluster cluster;
70
71
72
73     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
74
75     /**
76      *
77      */
78     public MusicDataStore() {
79         connectToCassaCluster();
80     }
81
82
83     /**
84      * @param cluster
85      * @param session
86      */
87     public MusicDataStore(Cluster cluster, Session session) {
88         this.session = session;
89         this.cluster = cluster;
90     }
91
92     /**
93      *
94      * @param remoteIp
95      * @throws MusicServiceException
96      */
97     public MusicDataStore(String remoteIp) {
98         try {
99             connectToCassaCluster(remoteIp);
100         } catch (MusicServiceException e) {
101             logger.error("Exception", e);
102             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
103         }
104     }
105
106
107
108     /**
109      * @param session
110      */
111     public void setSession(Session session) {
112         this.session = session;
113     }
114     
115     /**
116      * @param session
117      */
118     public Session getSession() {
119         return session;
120     }
121
122     /**
123      * @param cluster
124      */
125     public void setCluster(Cluster cluster) {
126         this.cluster = cluster;
127     }
128
129     /**
130      * 
131      * @return
132      */
133     private ArrayList<String> getAllPossibleLocalIps() {
134         ArrayList<String> allPossibleIps = new ArrayList<>();
135         try {
136             Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
137             while (en.hasMoreElements()) {
138                 NetworkInterface ni = en.nextElement();
139                 Enumeration<InetAddress> ee = ni.getInetAddresses();
140                 while (ee.hasMoreElements()) {
141                     InetAddress ia = ee.nextElement();
142                     allPossibleIps.add(ia.getHostAddress());
143                 }
144             }
145         } catch (SocketException e) {
146             logger.error("Exception", e);
147             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
148         }catch(Exception e) {
149             logger.error("Exception", e);
150                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
151         }
152         return allPossibleIps;
153     }
154
155     /**
156      * This method iterates through all available IP addresses and connects to multiple cassandra
157      * clusters.
158      */
159     private void connectToCassaCluster() {
160         Iterator<String> it = getAllPossibleLocalIps().iterator();
161         String address = "localhost";
162         String[] addresses = null;
163         address = MusicUtil.getMyCassaHost();
164                 addresses = address.split(",");
165                 
166         logger.info(EELFLoggerDelegate.applicationLogger,
167                         "Connecting to cassa cluster: Iterating through possible ips:"
168                                         + getAllPossibleLocalIps());
169         PoolingOptions poolingOptions = new PoolingOptions();
170         poolingOptions
171         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
172         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
173         while (it.hasNext()) {
174             try {
175                 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
176                         logger.info(EELFLoggerDelegate.applicationLogger,
177                                         "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
178                         cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
179                                            .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
180                                            //.withLoadBalancingPolicy(new RoundRobinPolicy())
181                                            .withPoolingOptions(poolingOptions)
182                                            .addContactPoints(addresses).build();
183                 }
184                 else
185                         cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
186                                                                 //.withLoadBalancingPolicy(new RoundRobinPolicy())
187                                                                 .addContactPoints(addresses).build();
188                 
189                 Metadata metadata = cluster.getMetadata();
190                 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
191                                 + metadata.getClusterName() + " at " + address);
192                 session = cluster.connect();
193
194                 break;
195             } catch (NoHostAvailableException e) {
196                 logger.error("Exception", e);
197                 address = it.next();
198                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
199             }
200         }
201     }
202
203     /**
204      * 
205      */
206     public void close() {
207         session.close();
208     }
209
210     /**
211      * This method connects to cassandra cluster on specific address.
212      * 
213      * @param address
214      */
215     private void connectToCassaCluster(String address) throws MusicServiceException {
216         String[] addresses = null;
217                 addresses = address.split(",");
218                 PoolingOptions poolingOptions = new PoolingOptions();
219         poolingOptions
220         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
221         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
222         if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
223                 logger.info(EELFLoggerDelegate.applicationLogger,
224                                 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
225                 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
226                            .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
227                            //.withLoadBalancingPolicy(new RoundRobinPolicy())
228                            .withPoolingOptions(poolingOptions)
229                            .addContactPoints(addresses).build();
230         }
231         else {
232                 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
233                                         //.withLoadBalancingPolicy(new RoundRobinPolicy())
234                                         .withPoolingOptions(poolingOptions)
235                                         .addContactPoints(addresses).build();
236         }
237         Metadata metadata = cluster.getMetadata();
238         logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
239                         + metadata.getClusterName() + " at " + address);
240         try {
241             session = cluster.connect();
242         } catch (Exception ex) {
243             logger.error("Exception", ex);
244             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY, ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE);
245             throw new MusicServiceException(
246                             "Error while connecting to Cassandra cluster.. " + ex.getMessage());
247         }
248     }
249
250     /**
251      * 
252      * @param keyspace
253      * @param tableName
254      * @param columnName
255      * @return DataType
256      */
257     public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
258         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
259         TableMetadata table = ks.getTable(tableName);
260         return table.getColumn(columnName).getType();
261
262     }
263
264     /**
265      * 
266      * @param keyspace
267      * @param tableName
268      * @return TableMetadata
269      */
270     public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
271         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
272         return ks.getTable(tableName);
273     }
274
275
276     /**
277      * Utility function to return the Java specific object type.
278      * 
279      * @param row
280      * @param colName
281      * @param colType
282      * @return
283      */
284     public Object getColValue(Row row, String colName, DataType colType) {
285
286         switch (colType.getName()) {
287             case VARCHAR:
288                 return row.getString(colName);
289             case UUID:
290                 return row.getUUID(colName);
291             case VARINT:
292                 return row.getVarint(colName);
293             case BIGINT:
294                 return row.getLong(colName);
295             case INT:
296                 return row.getInt(colName);
297             case FLOAT:
298                 return row.getFloat(colName);
299             case DOUBLE:
300                 return row.getDouble(colName);
301             case BOOLEAN:
302                 return row.getBool(colName);
303             case MAP:
304                 return row.getMap(colName, String.class, String.class);
305             case LIST:
306                 return row.getList(colName, String.class);
307             default:
308                 return null;
309         }
310     }
311     
312     public byte[] getBlobValue(Row row, String colName, DataType colType) {
313         ByteBuffer bb = row.getBytes(colName);
314         return bb.array();
315     }
316
317     public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
318         ColumnDefinitions colInfo = row.getColumnDefinitions();
319
320         for (Map.Entry<String, Object> entry : condition.entrySet()) {
321             String colName = entry.getKey();
322             DataType colType = colInfo.getType(colName);
323             Object columnValue = getColValue(row, colName, colType);
324             Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
325             if (!columnValue.equals(conditionValue))
326                 return false;
327         }
328         return true;
329     }
330
331     /**
332      * Utility function to store ResultSet values in to a MAP for output.
333      * 
334      * @param results
335      * @return MAP
336      */
337     public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
338         Map<String, HashMap<String, Object>> resultMap =
339                         new HashMap<>();
340         int counter = 0;
341         for (Row row : results) {
342             ColumnDefinitions colInfo = row.getColumnDefinitions();
343             HashMap<String, Object> resultOutput = new HashMap<>();
344             for (Definition definition : colInfo) {
345                 if (!(("vector_ts").equals(definition.getName()))) {
346                         if(definition.getType().toString().toLowerCase().contains("blob")) {
347                                 resultOutput.put(definition.getName(),
348                                 getBlobValue(row, definition.getName(), definition.getType()));
349                         } 
350                         else
351                                 resultOutput.put(definition.getName(),
352                                     getColValue(row, definition.getName(), definition.getType()));
353                 }
354             }
355             resultMap.put("row " + counter, resultOutput);
356             counter++;
357         }
358         return resultMap;
359     }
360
361
362     // Prepared Statements 1802 additions
363     /**
364      * This Method performs DDL and DML operations on Cassandra using specified consistency level
365      * 
366      * @param queryObject Object containing cassandra prepared query and values.
367      * @param consistency Specify consistency level for data synchronization across cassandra
368      *        replicas
369      * @return Boolean Indicates operation success or failure
370      * @throws MusicServiceException
371      * @throws MusicQueryException
372      */
373     public boolean executePut(PreparedQueryObject queryObject, String consistency)
374                     throws MusicServiceException, MusicQueryException {
375
376         boolean result = false;
377
378         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
379                 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
380             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
381                             + queryObject.getQuery() + "]");
382         }
383         logger.info(EELFLoggerDelegate.applicationLogger,
384                         "In preprared Execute Put: the actual insert query:"
385                                         + queryObject.getQuery() + "; the values"
386                                         + queryObject.getValues());
387         PreparedStatement preparedInsert = null;
388         try {
389                 
390                                 preparedInsert = session.prepare(queryObject.getQuery());
391                         
392         } catch(InvalidQueryException iqe) {
393             logger.error("Exception", iqe);
394                 logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
395                 throw new MusicQueryException(iqe.getMessage());
396         }catch(Exception e) {
397             logger.error("Exception", e);
398                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
399                 throw new MusicQueryException(e.getMessage());
400         }
401         
402         try {
403             if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
404                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
405                 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
406             } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
407                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
408                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
409             }
410
411             ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
412             result = rs.wasApplied();
413
414         }
415         catch (AlreadyExistsException ae) {
416             logger.error("Exception", ae);
417             logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
418                 throw new MusicServiceException(ae.getMessage());
419         }
420         catch (Exception e) {
421             logger.error("Exception", e);
422                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
423                 throw new MusicQueryException("Executing Session Failure for Request = " + "["
424                             + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
425         }
426
427
428         return result;
429     }
430
431     /**
432      * This method performs DDL operations on Cassandra using consistency level ONE.
433      * 
434      * @param queryObject Object containing cassandra prepared query and values.
435      * @return ResultSet
436      * @throws MusicServiceException
437      * @throws MusicQueryException
438      */
439     public ResultSet executeEventualGet(PreparedQueryObject queryObject)
440                     throws MusicServiceException, MusicQueryException {
441
442         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
443                 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
444                 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
445                             + queryObject.getQuery() + "]");
446         }
447         logger.info(EELFLoggerDelegate.applicationLogger,
448                         "Executing Eventual  get query:" + queryObject.getQuery());
449        
450         ResultSet results = null;
451         try {
452                  PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
453              preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
454              results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
455
456         } catch (Exception ex) {
457             logger.error("Exception", ex);
458                 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
459                 throw new MusicServiceException(ex.getMessage());
460         }
461         return results;
462     }
463
464     /**
465      * 
466      * This method performs DDL operation on Cassandra using consistency level QUORUM.
467      * 
468      * @param queryObject Object containing cassandra prepared query and values.
469      * @return ResultSet
470      * @throws MusicServiceException
471      * @throws MusicQueryException
472      */
473     public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
474                     throws MusicServiceException, MusicQueryException {
475         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
476                 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
477             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
478                             + queryObject.getQuery() + "]");
479         }
480         logger.info(EELFLoggerDelegate.applicationLogger,
481                         "Executing Critical get query:" + queryObject.getQuery());
482         PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
483         preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
484         ResultSet results = null;
485         try {
486             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
487         } catch (Exception ex) {
488             logger.error("Exception", ex);
489                 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
490                 throw new MusicServiceException(ex.getMessage());
491         }
492         return results;
493
494     }
495
496 }