2fbca44405746b4b8da5eee0d58bc8560342e4f6
[music.git] / jar / src / main / java / org / onap / music / datastore / MusicDataStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  * 
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  * 
19  * ============LICENSE_END=============================================
20  * ====================================================================
21  */
22
23 package org.onap.music.datastore;
24
25 import java.net.InetAddress;
26 import java.net.NetworkInterface;
27 import java.net.SocketException;
28 import java.util.ArrayList;
29 import java.util.Enumeration;
30 import java.util.HashMap;
31 import java.util.Iterator;
32 import java.util.Map;
33 import org.onap.music.eelf.logging.EELFLoggerDelegate;
34 import org.onap.music.eelf.logging.format.AppMessages;
35 import org.onap.music.eelf.logging.format.ErrorSeverity;
36 import org.onap.music.eelf.logging.format.ErrorTypes;
37 import org.onap.music.exceptions.MusicQueryException;
38 import org.onap.music.exceptions.MusicServiceException;
39 import org.onap.music.main.MusicUtil;
40 import com.datastax.driver.core.Cluster;
41 import com.datastax.driver.core.ColumnDefinitions;
42 import com.datastax.driver.core.ColumnDefinitions.Definition;
43 import com.datastax.driver.core.ConsistencyLevel;
44 import com.datastax.driver.core.DataType;
45 import com.datastax.driver.core.KeyspaceMetadata;
46 import com.datastax.driver.core.Metadata;
47 import com.datastax.driver.core.PreparedStatement;
48 import com.datastax.driver.core.ResultSet;
49 import com.datastax.driver.core.Row;
50 import com.datastax.driver.core.Session;
51 import com.datastax.driver.core.TableMetadata;
52 import com.datastax.driver.core.exceptions.AlreadyExistsException;
53 import com.datastax.driver.core.exceptions.InvalidQueryException;
54 import com.datastax.driver.core.exceptions.NoHostAvailableException;
55 import com.datastax.driver.core.policies.RoundRobinPolicy;
56 import com.datastax.driver.core.HostDistance;
57 import com.datastax.driver.core.PoolingOptions;
58 import org.onap.music.main.MusicUtil;
59
60
61 /**
62  * @author nelson24
63  *
64  */
65 public class MusicDataStore {
66
67     private Session session;
68     private Cluster cluster;
69
70
71
72     /**
73      * @param session
74      */
75     public void setSession(Session session) {
76         this.session = session;
77     }
78     
79     /**
80      * @param session
81      */
82     public Session getSession() {
83         return session;
84     }
85
86     /**
87      * @param cluster
88      */
89     public void setCluster(Cluster cluster) {
90         this.cluster = cluster;
91     }
92
93
94
95     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
96
97     /**
98      * 
99      */
100     public MusicDataStore() {
101         connectToCassaCluster();
102     }
103
104
105     /**
106      * @param cluster
107      * @param session
108      */
109     public MusicDataStore(Cluster cluster, Session session) {
110         this.session = session;
111         this.cluster = cluster;
112     }
113
114     /**
115      * 
116      * @param remoteIp
117      * @throws MusicServiceException
118      */
119     public MusicDataStore(String remoteIp) {
120         try {
121             connectToCassaCluster(remoteIp);
122         } catch (MusicServiceException e) {
123             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
124         }
125     }
126
127     /**
128      * 
129      * @return
130      */
131     private ArrayList<String> getAllPossibleLocalIps() {
132         ArrayList<String> allPossibleIps = new ArrayList<String>();
133         try {
134             Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
135             while (en.hasMoreElements()) {
136                 NetworkInterface ni = (NetworkInterface) en.nextElement();
137                 Enumeration<InetAddress> ee = ni.getInetAddresses();
138                 while (ee.hasMoreElements()) {
139                     InetAddress ia = (InetAddress) ee.nextElement();
140                     allPossibleIps.add(ia.getHostAddress());
141                 }
142             }
143         } catch (SocketException e) {
144             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
145         }catch(Exception e) {
146             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
147         }
148         return allPossibleIps;
149     }
150
151     /**
152      * This method iterates through all available IP addresses and connects to multiple cassandra
153      * clusters.
154      */
155     private void connectToCassaCluster() {
156         Iterator<String> it = getAllPossibleLocalIps().iterator();
157         String address = "localhost";
158         String[] addresses = null;
159         address = MusicUtil.getMyCassaHost();
160         addresses = address.split(",");
161         
162         logger.info(EELFLoggerDelegate.applicationLogger,
163                         "Connecting to cassa cluster: Iterating through possible ips:"
164                                         + getAllPossibleLocalIps());
165         PoolingOptions poolingOptions = new PoolingOptions();
166         poolingOptions
167         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
168         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
169         while (it.hasNext()) {
170             try {
171                 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
172                     logger.info(EELFLoggerDelegate.applicationLogger,
173                             "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
174                     cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
175                                        .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
176                                        //.withLoadBalancingPolicy(new RoundRobinPolicy())
177                                        .withoutJMXReporting()
178                                        .withPoolingOptions(poolingOptions)
179                                        .addContactPoints(addresses).build();
180                 }
181                 else
182                     cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
183                                          //.withLoadBalancingPolicy(new RoundRobinPolicy())
184                                          .addContactPoints(addresses).build();
185                 
186                 Metadata metadata = cluster.getMetadata();
187                 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
188                                 + metadata.getClusterName() + " at " + address);
189                 session = cluster.connect();
190
191                 break;
192             } catch (NoHostAvailableException e) {
193                 address = it.next();
194                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
195             }
196         }
197     }
198
199     /**
200      * 
201      */
202     public void close() {
203         session.close();
204     }
205
206     /**
207      * This method connects to cassandra cluster on specific address.
208      * 
209      * @param address
210      */
211     private void connectToCassaCluster(String address) throws MusicServiceException {
212         String[] addresses = null;
213         addresses = address.split(",");
214         PoolingOptions poolingOptions = new PoolingOptions();
215         poolingOptions
216         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
217         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
218         if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
219             logger.info(EELFLoggerDelegate.applicationLogger,
220                     "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
221             cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
222                        .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
223                        //.withLoadBalancingPolicy(new RoundRobinPolicy())
224                        .withoutJMXReporting()
225                        .withPoolingOptions(poolingOptions)
226                        .addContactPoints(addresses).build();
227         }
228         else {
229             cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
230                         //.withLoadBalancingPolicy(new RoundRobinPolicy())
231                         .withoutJMXReporting()
232                         .withPoolingOptions(poolingOptions)
233                         .addContactPoints(addresses).build();
234         }
235         Metadata metadata = cluster.getMetadata();
236         logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
237                         + metadata.getClusterName() + " at " + address);
238         try {
239             session = cluster.connect();
240         } catch (Exception ex) {
241             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY, ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE);
242             throw new MusicServiceException(
243                             "Error while connecting to Cassandra cluster.. " + ex.getMessage());
244         }
245     }
246
247     /**
248      * 
249      * @param keyspace
250      * @param tableName
251      * @param columnName
252      * @return DataType
253      */
254     public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
255         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
256         TableMetadata table = ks.getTable(tableName);
257         return table.getColumn(columnName).getType();
258
259     }
260
261     /**
262      * 
263      * @param keyspace
264      * @param tableName
265      * @return TableMetadata
266      */
267     public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
268         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
269         return ks.getTable(tableName);
270     }
271
272
273     /**
274      * Utility function to return the Java specific object type.
275      * 
276      * @param row
277      * @param colName
278      * @param colType
279      * @return
280      */
281     public Object getColValue(Row row, String colName, DataType colType) {
282
283         switch (colType.getName()) {
284             case VARCHAR:
285                 return row.getString(colName);
286             case UUID:
287                 return row.getUUID(colName);
288             case VARINT:
289                 return row.getVarint(colName);
290             case BIGINT:
291                 return row.getLong(colName);
292             case INT:
293                 return row.getInt(colName);
294             case FLOAT:
295                 return row.getFloat(colName);
296             case DOUBLE:
297                 return row.getDouble(colName);
298             case BOOLEAN:
299                 return row.getBool(colName);
300             case MAP:
301                 return row.getMap(colName, String.class, String.class);
302             default:
303                 return null;
304         }
305     }
306
307     public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
308         ColumnDefinitions colInfo = row.getColumnDefinitions();
309
310         for (Map.Entry<String, Object> entry : condition.entrySet()) {
311             String colName = entry.getKey();
312             DataType colType = colInfo.getType(colName);
313             Object columnValue = getColValue(row, colName, colType);
314             Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
315             if (columnValue.equals(conditionValue) == false)
316                 return false;
317         }
318         return true;
319     }
320
321     /**
322      * Utility function to store ResultSet values in to a MAP for output.
323      * 
324      * @param results
325      * @return MAP
326      */
327     public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
328         Map<String, HashMap<String, Object>> resultMap =
329                         new HashMap<String, HashMap<String, Object>>();
330         int counter = 0;
331         for (Row row : results) {
332             ColumnDefinitions colInfo = row.getColumnDefinitions();
333             HashMap<String, Object> resultOutput = new HashMap<String, Object>();
334             for (Definition definition : colInfo) {
335                 if (!definition.getName().equals("vector_ts"))
336                     resultOutput.put(definition.getName(),
337                                     getColValue(row, definition.getName(), definition.getType()));
338             }
339             resultMap.put("row " + counter, resultOutput);
340             counter++;
341         }
342         return resultMap;
343     }
344
345
346     // Prepared Statements 1802 additions
347     /**
348      * This Method performs DDL and DML operations on Cassandra using specified consistency level
349      * 
350      * @param queryObject Object containing cassandra prepared query and values.
351      * @param consistency Specify consistency level for data synchronization across cassandra
352      *        replicas
353      * @return Boolean Indicates operation success or failure
354      * @throws MusicServiceException
355      * @throws MusicQueryException
356      */
357     public boolean executePut(PreparedQueryObject queryObject, String consistency)
358                     throws MusicServiceException, MusicQueryException {
359
360         boolean result = false;
361
362         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
363             logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
364             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
365                             + queryObject.getQuery() + "]");
366         }
367         logger.info(EELFLoggerDelegate.applicationLogger,
368                         "In preprared Execute Put: the actual insert query:"
369                                         + queryObject.getQuery() + "; the values"
370                                         + queryObject.getValues());
371         PreparedStatement preparedInsert = null;
372         try {
373             
374                 preparedInsert = session.prepare(queryObject.getQuery());
375             
376         } catch(InvalidQueryException iqe) {
377             logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
378             throw new MusicQueryException(iqe.getMessage());
379         }catch(Exception e) {
380             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
381             throw new MusicQueryException(e.getMessage());
382         }
383         
384         try {
385             if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
386                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
387                 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
388             } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
389                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
390                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
391             }
392
393             ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
394             result = rs.wasApplied();
395
396         }
397         catch (AlreadyExistsException ae) {
398             logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
399             throw new MusicServiceException(ae.getMessage());
400         }
401         catch (Exception e) {
402             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
403             throw new MusicQueryException("Executing Session Failure for Request = " + "["
404                             + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
405         }
406
407
408         return result;
409     }
410
411     /**
412      * This method performs DDL operations on Cassandra using consistency level ONE.
413      * 
414      * @param queryObject Object containing cassandra prepared query and values.
415      * @return ResultSet
416      * @throws MusicServiceException
417      * @throws MusicQueryException
418      */
419     public ResultSet executeEventualGet(PreparedQueryObject queryObject)
420                     throws MusicServiceException, MusicQueryException {
421
422         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
423             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
424             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
425                             + queryObject.getQuery() + "]");
426         }
427         logger.info(EELFLoggerDelegate.applicationLogger,
428                         "Executing Eventual  get query:" + queryObject.getQuery());
429        
430         ResultSet results = null;
431         try {
432              PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
433              preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
434              results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
435
436         } catch (Exception ex) {
437             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
438             throw new MusicServiceException(ex.getMessage());
439         }
440         return results;
441     }
442
443     /**
444      * 
445      * This method performs DDL operation on Cassandra using consistency level QUORUM.
446      * 
447      * @param queryObject Object containing cassandra prepared query and values.
448      * @return ResultSet
449      * @throws MusicServiceException
450      * @throws MusicQueryException
451      */
452     public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
453                     throws MusicServiceException, MusicQueryException {
454         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
455             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
456             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
457                             + queryObject.getQuery() + "]");
458         }
459         logger.info(EELFLoggerDelegate.applicationLogger,
460                         "Executing Critical get query:" + queryObject.getQuery());
461         PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
462         preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
463         ResultSet results = null;
464         try {
465             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
466         } catch (Exception ex) {
467             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
468             throw new MusicServiceException(ex.getMessage());
469         }
470         return results;
471
472     }
473
474 }