Push variuos changes
[music.git] / jar / src / main / java / org / onap / music / datastore / MusicDataStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  * 
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  * 
19  * ============LICENSE_END=============================================
20  * ====================================================================
21  */
22
23 package org.onap.music.datastore;
24
25 import java.net.InetAddress;
26 import java.net.NetworkInterface;
27 import java.net.SocketException;
28 import java.util.ArrayList;
29 import java.util.Enumeration;
30 import java.util.HashMap;
31 import java.util.Iterator;
32 import java.util.Map;
33 import org.onap.music.eelf.logging.EELFLoggerDelegate;
34 import org.onap.music.eelf.logging.format.AppMessages;
35 import org.onap.music.eelf.logging.format.ErrorSeverity;
36 import org.onap.music.eelf.logging.format.ErrorTypes;
37 import org.onap.music.exceptions.MusicQueryException;
38 import org.onap.music.exceptions.MusicServiceException;
39 import org.onap.music.main.MusicUtil;
40 import com.datastax.driver.core.Cluster;
41 import com.datastax.driver.core.ColumnDefinitions;
42 import com.datastax.driver.core.ColumnDefinitions.Definition;
43 import com.datastax.driver.core.ConsistencyLevel;
44 import com.datastax.driver.core.DataType;
45 import com.datastax.driver.core.KeyspaceMetadata;
46 import com.datastax.driver.core.Metadata;
47 import com.datastax.driver.core.PreparedStatement;
48 import com.datastax.driver.core.ResultSet;
49 import com.datastax.driver.core.Row;
50 import com.datastax.driver.core.Session;
51 import com.datastax.driver.core.TableMetadata;
52 import com.datastax.driver.core.exceptions.AlreadyExistsException;
53 import com.datastax.driver.core.exceptions.InvalidQueryException;
54 import com.datastax.driver.core.exceptions.NoHostAvailableException;
55 import com.datastax.driver.core.policies.RoundRobinPolicy;
56 import com.datastax.driver.core.HostDistance;
57 import com.datastax.driver.core.PoolingOptions;
58
59
60 /**
61  * @author nelson24
62  *
63  */
64 public class MusicDataStore {
65
66     private Session session;
67     private Cluster cluster;
68
69
70
71     /**
72      * @param session
73      */
74     public void setSession(Session session) {
75         this.session = session;
76     }
77     
78     /**
79      * @param session
80      */
81     public Session getSession() {
82         return session;
83     }
84
85     /**
86      * @param cluster
87      */
88     public void setCluster(Cluster cluster) {
89         this.cluster = cluster;
90     }
91
92
93
94     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
95
96     /**
97      * 
98      */
99     public MusicDataStore() {
100         connectToCassaCluster();
101     }
102
103
104     /**
105      * @param cluster
106      * @param session
107      */
108     public MusicDataStore(Cluster cluster, Session session) {
109         this.session = session;
110         this.cluster = cluster;
111     }
112
113     /**
114      * 
115      * @param remoteIp
116      * @throws MusicServiceException
117      */
118     public MusicDataStore(String remoteIp) {
119         try {
120             connectToCassaCluster(remoteIp);
121         } catch (MusicServiceException e) {
122             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
123         }
124     }
125
126     /**
127      * 
128      * @return
129      */
130     private ArrayList<String> getAllPossibleLocalIps() {
131         ArrayList<String> allPossibleIps = new ArrayList<String>();
132         try {
133             Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
134             while (en.hasMoreElements()) {
135                 NetworkInterface ni = (NetworkInterface) en.nextElement();
136                 Enumeration<InetAddress> ee = ni.getInetAddresses();
137                 while (ee.hasMoreElements()) {
138                     InetAddress ia = (InetAddress) ee.nextElement();
139                     allPossibleIps.add(ia.getHostAddress());
140                 }
141             }
142         } catch (SocketException e) {
143             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
144         }catch(Exception e) {
145             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
146         }
147         return allPossibleIps;
148     }
149
150     /**
151      * This method iterates through all available IP addresses and connects to multiple cassandra
152      * clusters.
153      */
154     private void connectToCassaCluster() {
155         Iterator<String> it = getAllPossibleLocalIps().iterator();
156         String address = "localhost";
157         String[] addresses = null;
158         address = MusicUtil.getMyCassaHost();
159         addresses = address.split(",");
160         
161         logger.info(EELFLoggerDelegate.applicationLogger,
162                         "Connecting to cassa cluster: Iterating through possible ips:"
163                                         + getAllPossibleLocalIps());
164         PoolingOptions poolingOptions = new PoolingOptions();
165         poolingOptions
166         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
167         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
168         while (it.hasNext()) {
169             try {
170                 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
171                     logger.info(EELFLoggerDelegate.applicationLogger,
172                             "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
173                     cluster = Cluster.builder().withPort(9042)
174                                        .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
175                                        //.withLoadBalancingPolicy(new RoundRobinPolicy())
176                                        .withPoolingOptions(poolingOptions)
177                                        .addContactPoints(addresses).build();
178                 }
179                 else
180                     cluster = Cluster.builder().withPort(9042)
181                                          //.withLoadBalancingPolicy(new RoundRobinPolicy())
182                                          .addContactPoints(addresses).build();
183                 
184                 Metadata metadata = cluster.getMetadata();
185                 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
186                                 + metadata.getClusterName() + " at " + address);
187                 session = cluster.connect();
188
189                 break;
190             } catch (NoHostAvailableException e) {
191                 address = it.next();
192                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
193             }
194         }
195     }
196
197     /**
198      * 
199      */
200     public void close() {
201         session.close();
202     }
203
204     /**
205      * This method connects to cassandra cluster on specific address.
206      * 
207      * @param address
208      */
209     private void connectToCassaCluster(String address) throws MusicServiceException {
210         String[] addresses = null;
211         addresses = address.split(",");
212         PoolingOptions poolingOptions = new PoolingOptions();
213         poolingOptions
214         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
215         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
216         if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
217             logger.info(EELFLoggerDelegate.applicationLogger,
218                     "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
219             cluster = Cluster.builder().withPort(9042)
220                        .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
221                        //.withLoadBalancingPolicy(new RoundRobinPolicy())
222                        .withPoolingOptions(poolingOptions)
223                        .addContactPoints(addresses).build();
224         }
225         else {
226             cluster = Cluster.builder().withPort(9042)
227                         //.withLoadBalancingPolicy(new RoundRobinPolicy())
228                         .withPoolingOptions(poolingOptions)
229                         .addContactPoints(addresses).build();
230         }
231         Metadata metadata = cluster.getMetadata();
232         logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
233                         + metadata.getClusterName() + " at " + address);
234         try {
235             session = cluster.connect();
236         } catch (Exception ex) {
237             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY, ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE);
238             throw new MusicServiceException(
239                             "Error while connecting to Cassandra cluster.. " + ex.getMessage());
240         }
241     }
242
243     /**
244      * 
245      * @param keyspace
246      * @param tableName
247      * @param columnName
248      * @return DataType
249      */
250     public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
251         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
252         TableMetadata table = ks.getTable(tableName);
253         return table.getColumn(columnName).getType();
254
255     }
256
257     /**
258      * 
259      * @param keyspace
260      * @param tableName
261      * @return TableMetadata
262      */
263     public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
264         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
265         return ks.getTable(tableName);
266     }
267
268
269     /**
270      * Utility function to return the Java specific object type.
271      * 
272      * @param row
273      * @param colName
274      * @param colType
275      * @return
276      */
277     public Object getColValue(Row row, String colName, DataType colType) {
278
279         switch (colType.getName()) {
280             case VARCHAR:
281                 return row.getString(colName);
282             case UUID:
283                 return row.getUUID(colName);
284             case VARINT:
285                 return row.getVarint(colName);
286             case BIGINT:
287                 return row.getLong(colName);
288             case INT:
289                 return row.getInt(colName);
290             case FLOAT:
291                 return row.getFloat(colName);
292             case DOUBLE:
293                 return row.getDouble(colName);
294             case BOOLEAN:
295                 return row.getBool(colName);
296             case MAP:
297                 return row.getMap(colName, String.class, String.class);
298             default:
299                 return null;
300         }
301     }
302
303     public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
304         ColumnDefinitions colInfo = row.getColumnDefinitions();
305
306         for (Map.Entry<String, Object> entry : condition.entrySet()) {
307             String colName = entry.getKey();
308             DataType colType = colInfo.getType(colName);
309             Object columnValue = getColValue(row, colName, colType);
310             Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
311             if (columnValue.equals(conditionValue) == false)
312                 return false;
313         }
314         return true;
315     }
316
317     /**
318      * Utility function to store ResultSet values in to a MAP for output.
319      * 
320      * @param results
321      * @return MAP
322      */
323     public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
324         Map<String, HashMap<String, Object>> resultMap =
325                         new HashMap<String, HashMap<String, Object>>();
326         int counter = 0;
327         for (Row row : results) {
328             ColumnDefinitions colInfo = row.getColumnDefinitions();
329             HashMap<String, Object> resultOutput = new HashMap<String, Object>();
330             for (Definition definition : colInfo) {
331                 if (!definition.getName().equals("vector_ts"))
332                     resultOutput.put(definition.getName(),
333                                     getColValue(row, definition.getName(), definition.getType()));
334             }
335             resultMap.put("row " + counter, resultOutput);
336             counter++;
337         }
338         return resultMap;
339     }
340
341
342     // Prepared Statements 1802 additions
343     /**
344      * This Method performs DDL and DML operations on Cassandra using specified consistency level
345      * 
346      * @param queryObject Object containing cassandra prepared query and values.
347      * @param consistency Specify consistency level for data synchronization across cassandra
348      *        replicas
349      * @return Boolean Indicates operation success or failure
350      * @throws MusicServiceException
351      * @throws MusicQueryException
352      */
353     public boolean executePut(PreparedQueryObject queryObject, String consistency)
354                     throws MusicServiceException, MusicQueryException {
355
356         boolean result = false;
357
358         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
359             logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
360             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
361                             + queryObject.getQuery() + "]");
362         }
363         logger.info(EELFLoggerDelegate.applicationLogger,
364                         "In preprared Execute Put: the actual insert query:"
365                                         + queryObject.getQuery() + "; the values"
366                                         + queryObject.getValues());
367         PreparedStatement preparedInsert = null;
368         try {
369             
370                 preparedInsert = session.prepare(queryObject.getQuery());
371             
372         } catch(InvalidQueryException iqe) {
373             logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
374             throw new MusicQueryException(iqe.getMessage());
375         }catch(Exception e) {
376             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
377             throw new MusicQueryException(e.getMessage());
378         }
379         
380         try {
381             if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
382                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
383                 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
384             } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
385                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
386                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
387             }
388
389             ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
390             result = rs.wasApplied();
391
392         }
393         catch (AlreadyExistsException ae) {
394             logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
395             throw new MusicServiceException(ae.getMessage());
396         }
397         catch (Exception e) {
398             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
399             throw new MusicQueryException("Executing Session Failure for Request = " + "["
400                             + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
401         }
402
403
404         return result;
405     }
406
407     /**
408      * This method performs DDL operations on Cassandra using consistency level ONE.
409      * 
410      * @param queryObject Object containing cassandra prepared query and values.
411      * @return ResultSet
412      * @throws MusicServiceException
413      * @throws MusicQueryException
414      */
415     public ResultSet executeEventualGet(PreparedQueryObject queryObject)
416                     throws MusicServiceException, MusicQueryException {
417
418         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
419             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
420             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
421                             + queryObject.getQuery() + "]");
422         }
423         logger.info(EELFLoggerDelegate.applicationLogger,
424                         "Executing Eventual  get query:" + queryObject.getQuery());
425        
426         ResultSet results = null;
427         try {
428              PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
429              preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
430              results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
431
432         } catch (Exception ex) {
433             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
434             throw new MusicServiceException(ex.getMessage());
435         }
436         return results;
437     }
438
439     /**
440      * 
441      * This method performs DDL operation on Cassandra using consistency level QUORUM.
442      * 
443      * @param queryObject Object containing cassandra prepared query and values.
444      * @return ResultSet
445      * @throws MusicServiceException
446      * @throws MusicQueryException
447      */
448     public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
449                     throws MusicServiceException, MusicQueryException {
450         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
451             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
452             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
453                             + queryObject.getQuery() + "]");
454         }
455         logger.info(EELFLoggerDelegate.applicationLogger,
456                         "Executing Critical get query:" + queryObject.getQuery());
457         PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
458         preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
459         ResultSet results = null;
460         try {
461             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
462         } catch (Exception ex) {
463             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
464             throw new MusicServiceException(ex.getMessage());
465         }
466         return results;
467
468     }
469
470 }