various Updates
[music.git] / src / main / java / org / onap / music / datastore / MusicDataStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  * 
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  * 
19  * ============LICENSE_END=============================================
20  * ====================================================================
21  */
22 package org.onap.music.datastore;
23
24 import java.net.InetAddress;
25 import java.net.NetworkInterface;
26 import java.net.SocketException;
27 import java.nio.ByteBuffer;
28 import java.util.ArrayList;
29 import java.util.Enumeration;
30 import java.util.HashMap;
31 import java.util.Iterator;
32 import java.util.Map;
33 import org.onap.music.eelf.logging.EELFLoggerDelegate;
34 import org.onap.music.eelf.logging.format.AppMessages;
35 import org.onap.music.eelf.logging.format.ErrorSeverity;
36 import org.onap.music.eelf.logging.format.ErrorTypes;
37 import org.onap.music.exceptions.MusicQueryException;
38 import org.onap.music.exceptions.MusicServiceException;
39 import org.onap.music.main.MusicUtil;
40 import com.datastax.driver.core.Cluster;
41 import com.datastax.driver.core.ColumnDefinitions;
42 import com.datastax.driver.core.ColumnDefinitions.Definition;
43 import com.datastax.driver.core.ConsistencyLevel;
44 import com.datastax.driver.core.DataType;
45 import com.datastax.driver.core.KeyspaceMetadata;
46 import com.datastax.driver.core.Metadata;
47 import com.datastax.driver.core.PreparedStatement;
48 import com.datastax.driver.core.ResultSet;
49 import com.datastax.driver.core.Row;
50 import com.datastax.driver.core.Session;
51 import com.datastax.driver.core.TableMetadata;
52 import com.datastax.driver.core.exceptions.AlreadyExistsException;
53 import com.datastax.driver.core.exceptions.InvalidQueryException;
54 import com.datastax.driver.core.exceptions.NoHostAvailableException;
55 import com.sun.jersey.core.util.Base64;
56
57 /**
58  * @author nelson24
59  *
60  */
61 public class MusicDataStore {
62
63     private Session session;
64     private Cluster cluster;
65
66
67
68     /**
69      * @param session
70      */
71     public void setSession(Session session) {
72         this.session = session;
73     }
74     
75     /**
76      * @param session
77      */
78     public Session getSession() {
79         return session;
80     }
81
82     /**
83      * @param cluster
84      */
85     public void setCluster(Cluster cluster) {
86         this.cluster = cluster;
87     }
88
89
90
91     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
92
93     /**
94      * 
95      */
96     public MusicDataStore() {
97         connectToCassaCluster();
98     }
99
100
101     /**
102      * @param cluster
103      * @param session
104      */
105     public MusicDataStore(Cluster cluster, Session session) {
106         this.session = session;
107         this.cluster = cluster;
108     }
109
110     /**
111      * 
112      * @param remoteIp
113      * @throws MusicServiceException
114      */
115     public MusicDataStore(String remoteIp) {
116         try {
117             connectToCassaCluster(remoteIp);
118         } catch (MusicServiceException e) {
119             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
120         }
121     }
122
123     /**
124      * 
125      * @return
126      */
127     private ArrayList<String> getAllPossibleLocalIps() {
128         ArrayList<String> allPossibleIps = new ArrayList<String>();
129         try {
130             Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
131             while (en.hasMoreElements()) {
132                 NetworkInterface ni = (NetworkInterface) en.nextElement();
133                 Enumeration<InetAddress> ee = ni.getInetAddresses();
134                 while (ee.hasMoreElements()) {
135                     InetAddress ia = (InetAddress) ee.nextElement();
136                     allPossibleIps.add(ia.getHostAddress());
137                 }
138             }
139         } catch (SocketException e) {
140             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
141         }catch(Exception e) {
142                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
143         }
144         return allPossibleIps;
145     }
146
147     /**
148      * This method iterates through all available IP addresses and connects to multiple cassandra
149      * clusters.
150      */
151     private void connectToCassaCluster() {
152         Iterator<String> it = getAllPossibleLocalIps().iterator();
153         String address = "localhost";
154         logger.info(EELFLoggerDelegate.applicationLogger,
155                         "Connecting to cassa cluster: Iterating through possible ips:"
156                                         + getAllPossibleLocalIps());
157         while (it.hasNext()) {
158             try {
159                 cluster = Cluster.builder().withPort(9042)
160                                 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
161                                 .addContactPoint(address).build();
162                 Metadata metadata = cluster.getMetadata();
163                 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
164                                 + metadata.getClusterName() + " at " + address);
165                 session = cluster.connect();
166
167                 break;
168             } catch (NoHostAvailableException e) {
169                 address = it.next();
170                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
171             }
172         }
173     }
174
175     /**
176      * 
177      */
178     public void close() {
179         session.close();
180     }
181
182     /**
183      * This method connects to cassandra cluster on specific address.
184      * 
185      * @param address
186      */
187     private void connectToCassaCluster(String address) throws MusicServiceException {
188         cluster = Cluster.builder().withPort(9042)
189                         .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
190                         .addContactPoint(address).build();
191         Metadata metadata = cluster.getMetadata();
192         logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
193                         + metadata.getClusterName() + " at " + address);
194         try {
195             session = cluster.connect();
196         } catch (Exception ex) {
197             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY, ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE);
198             throw new MusicServiceException(
199                             "Error while connecting to Cassandra cluster.. " + ex.getMessage());
200         }
201     }
202
203     /**
204      * 
205      * @param keyspace
206      * @param tableName
207      * @param columnName
208      * @return DataType
209      */
210     public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
211         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
212         TableMetadata table = ks.getTable(tableName);
213         return table.getColumn(columnName).getType();
214
215     }
216
217     /**
218      * 
219      * @param keyspace
220      * @param tableName
221      * @return TableMetadata
222      */
223     public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
224         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
225         return ks.getTable(tableName);
226     }
227
228
229     /**
230      * Utility function to return the Java specific object type.
231      * 
232      * @param row
233      * @param colName
234      * @param colType
235      * @return
236      */
237     public Object getColValue(Row row, String colName, DataType colType) {
238
239         switch (colType.getName()) {
240             case VARCHAR:
241                 return row.getString(colName);
242             case UUID:
243                 return row.getUUID(colName);
244             case VARINT:
245                 return row.getVarint(colName);
246             case BIGINT:
247                 return row.getLong(colName);
248             case INT:
249                 return row.getInt(colName);
250             case FLOAT:
251                 return row.getFloat(colName);
252             case DOUBLE:
253                 return row.getDouble(colName);
254             case BOOLEAN:
255                 return row.getBool(colName);
256             case MAP:
257                 return row.getMap(colName, String.class, String.class);
258             case LIST:
259                 return row.getList(colName, String.class);
260             default:
261                 return null;
262         }
263     }
264     
265     public byte[] getBlobValue(Row row, String colName, DataType colType) {
266         ByteBuffer bb = row.getBytes(colName);
267         byte[] data = bb.array();
268         return data;
269     }
270
271     public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
272         ColumnDefinitions colInfo = row.getColumnDefinitions();
273
274         for (Map.Entry<String, Object> entry : condition.entrySet()) {
275             String colName = entry.getKey();
276             DataType colType = colInfo.getType(colName);
277             Object columnValue = getColValue(row, colName, colType);
278             Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
279             if (columnValue.equals(conditionValue) == false)
280                 return false;
281         }
282         return true;
283     }
284
285     /**
286      * Utility function to store ResultSet values in to a MAP for output.
287      * 
288      * @param results
289      * @return MAP
290      */
291     public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
292         Map<String, HashMap<String, Object>> resultMap =
293                         new HashMap<String, HashMap<String, Object>>();
294         int counter = 0;
295         for (Row row : results) {
296             ColumnDefinitions colInfo = row.getColumnDefinitions();
297             HashMap<String, Object> resultOutput = new HashMap<String, Object>();
298             for (Definition definition : colInfo) {
299                 if (!definition.getName().equals("vector_ts")) {
300                         if(definition.getType().toString().toLowerCase().contains("blob")) {
301                                 resultOutput.put(definition.getName(),
302                                 getBlobValue(row, definition.getName(), definition.getType()));
303                         } 
304                         else
305                                 resultOutput.put(definition.getName(),
306                                     getColValue(row, definition.getName(), definition.getType()));
307                 }
308             }
309             resultMap.put("row " + counter, resultOutput);
310             counter++;
311         }
312         return resultMap;
313     }
314
315
316     // Prepared Statements 1802 additions
317     /**
318      * This Method performs DDL and DML operations on Cassandra using specified consistency level
319      * 
320      * @param queryObject Object containing cassandra prepared query and values.
321      * @param consistency Specify consistency level for data synchronization across cassandra
322      *        replicas
323      * @return Boolean Indicates operation success or failure
324      * @throws MusicServiceException
325      * @throws MusicQueryException
326      */
327     public boolean executePut(PreparedQueryObject queryObject, String consistency)
328                     throws MusicServiceException, MusicQueryException {
329
330         boolean result = false;
331
332         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
333                 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
334             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
335                             + queryObject.getQuery() + "]");
336         }
337         logger.info(EELFLoggerDelegate.applicationLogger,
338                         "In preprared Execute Put: the actual insert query:"
339                                         + queryObject.getQuery() + "; the values"
340                                         + queryObject.getValues());
341         PreparedStatement preparedInsert = null;
342         try {
343                 
344                                 preparedInsert = session.prepare(queryObject.getQuery());
345                         
346         } catch(InvalidQueryException iqe) {
347                 logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
348                 throw new MusicQueryException(iqe.getMessage());
349         }catch(Exception e) {
350                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
351                 throw new MusicQueryException(e.getMessage());
352         }
353         
354         try {
355             if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
356                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
357                 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
358             } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
359                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
360                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
361             }
362
363             ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
364             result = rs.wasApplied();
365
366         }
367         catch (AlreadyExistsException ae) {
368             logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
369                 throw new MusicServiceException(ae.getMessage());
370         }
371         catch (Exception e) {
372                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
373                 throw new MusicQueryException("Executing Session Failure for Request = " + "["
374                             + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
375         }
376
377
378         return result;
379     }
380
381     /**
382      * This method performs DDL operations on Cassandra using consistency level ONE.
383      * 
384      * @param queryObject Object containing cassandra prepared query and values.
385      * @return ResultSet
386      * @throws MusicServiceException
387      * @throws MusicQueryException
388      */
389     public ResultSet executeEventualGet(PreparedQueryObject queryObject)
390                     throws MusicServiceException, MusicQueryException {
391
392         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
393                 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
394                 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
395                             + queryObject.getQuery() + "]");
396         }
397         logger.info(EELFLoggerDelegate.applicationLogger,
398                         "Executing Eventual  get query:" + queryObject.getQuery());
399        
400         ResultSet results = null;
401         try {
402                  PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
403              preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
404              results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
405
406         } catch (Exception ex) {
407                 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
408                 throw new MusicServiceException(ex.getMessage());
409         }
410         return results;
411     }
412
413     /**
414      * 
415      * This method performs DDL operation on Cassandra using consistency level QUORUM.
416      * 
417      * @param queryObject Object containing cassandra prepared query and values.
418      * @return ResultSet
419      * @throws MusicServiceException
420      * @throws MusicQueryException
421      */
422     public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
423                     throws MusicServiceException, MusicQueryException {
424         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
425                 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
426             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
427                             + queryObject.getQuery() + "]");
428         }
429         logger.info(EELFLoggerDelegate.applicationLogger,
430                         "Executing Critical get query:" + queryObject.getQuery());
431         PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
432         preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
433         ResultSet results = null;
434         try {
435             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
436         } catch (Exception ex) {
437                 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
438                 throw new MusicServiceException(ex.getMessage());
439         }
440         return results;
441
442     }
443
444 }