Applying bug fixes
[music.git] / src / main / java / org / onap / music / datastore / MusicDataStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  * 
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  * 
19  * ============LICENSE_END=============================================
20  * ====================================================================
21  */
22 package org.onap.music.datastore;
23
24 import java.net.InetAddress;
25 import java.net.NetworkInterface;
26 import java.net.SocketException;
27 import java.util.ArrayList;
28 import java.util.Enumeration;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.Map;
32 import org.onap.music.eelf.logging.EELFLoggerDelegate;
33 import org.onap.music.exceptions.MusicQueryException;
34 import org.onap.music.exceptions.MusicServiceException;
35 import org.onap.music.main.MusicUtil;
36 import com.datastax.driver.core.Cluster;
37 import com.datastax.driver.core.ColumnDefinitions;
38 import com.datastax.driver.core.ColumnDefinitions.Definition;
39 import com.datastax.driver.core.ConsistencyLevel;
40 import com.datastax.driver.core.DataType;
41 import com.datastax.driver.core.KeyspaceMetadata;
42 import com.datastax.driver.core.Metadata;
43 import com.datastax.driver.core.PreparedStatement;
44 import com.datastax.driver.core.ResultSet;
45 import com.datastax.driver.core.Row;
46 import com.datastax.driver.core.Session;
47 import com.datastax.driver.core.TableMetadata;
48 import com.datastax.driver.core.exceptions.NoHostAvailableException;
49
50 /**
51  * @author nelson24
52  *
53  */
54 public class MusicDataStore {
55
56     private Session session;
57     private Cluster cluster;
58
59
60
61     /**
62      * @param session
63      */
64     public void setSession(Session session) {
65         this.session = session;
66     }
67
68     /**
69      * @param cluster
70      */
71     public void setCluster(Cluster cluster) {
72         this.cluster = cluster;
73     }
74
75
76
77     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
78
79     /**
80      * 
81      */
82     public MusicDataStore() {
83         connectToCassaCluster();
84     }
85
86
87     /**
88      * @param cluster
89      * @param session
90      */
91     public MusicDataStore(Cluster cluster, Session session) {
92         this.session = session;
93         this.cluster = cluster;
94     }
95
96     /**
97      * 
98      * @param remoteIp
99      * @throws MusicServiceException
100      */
101     public MusicDataStore(String remoteIp) {
102         try {
103             connectToCassaCluster(remoteIp);
104         } catch (MusicServiceException e) {
105             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
106         }
107     }
108
109     /**
110      * 
111      * @return
112      */
113     private ArrayList<String> getAllPossibleLocalIps() {
114         ArrayList<String> allPossibleIps = new ArrayList<String>();
115         try {
116             Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
117             while (en.hasMoreElements()) {
118                 NetworkInterface ni = (NetworkInterface) en.nextElement();
119                 Enumeration<InetAddress> ee = ni.getInetAddresses();
120                 while (ee.hasMoreElements()) {
121                     InetAddress ia = (InetAddress) ee.nextElement();
122                     allPossibleIps.add(ia.getHostAddress());
123                 }
124             }
125         } catch (SocketException e) {
126             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
127         }
128         return allPossibleIps;
129     }
130
131     /**
132      * This method iterates through all available IP addresses and connects to multiple cassandra
133      * clusters.
134      */
135     private void connectToCassaCluster() {
136         Iterator<String> it = getAllPossibleLocalIps().iterator();
137         String address = "localhost";
138         logger.info(EELFLoggerDelegate.applicationLogger,
139                         "Connecting to cassa cluster: Iterating through possible ips:"
140                                         + getAllPossibleLocalIps());
141         while (it.hasNext()) {
142             try {
143                 cluster = Cluster.builder().withPort(9042)
144                                 .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
145                                 .addContactPoint(address).build();
146                 Metadata metadata = cluster.getMetadata();
147                 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
148                                 + metadata.getClusterName() + " at " + address);
149                 session = cluster.connect();
150
151                 break;
152             } catch (NoHostAvailableException e) {
153                 address = it.next();
154                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
155             }
156         }
157     }
158
159     /**
160      * 
161      */
162     public void close() {
163         session.close();
164     }
165
166     /**
167      * This method connects to cassandra cluster on specific address.
168      * 
169      * @param address
170      */
171     private void connectToCassaCluster(String address) throws MusicServiceException {
172         cluster = Cluster.builder().withPort(9042)
173                         .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
174                         .addContactPoint(address).build();
175         Metadata metadata = cluster.getMetadata();
176         logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
177                         + metadata.getClusterName() + " at " + address);
178         try {
179             session = cluster.connect();
180         } catch (Exception ex) {
181             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
182             throw new MusicServiceException(
183                             "Error while connecting to Cassandra cluster.. " + ex.getMessage());
184         }
185     }
186
187     /**
188      * 
189      * @param keyspace
190      * @param tableName
191      * @param columnName
192      * @return DataType
193      */
194     public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
195         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
196         TableMetadata table = ks.getTable(tableName);
197         return table.getColumn(columnName).getType();
198
199     }
200
201     /**
202      * 
203      * @param keyspace
204      * @param tableName
205      * @return TableMetadata
206      */
207     public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
208         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
209         return ks.getTable(tableName);
210     }
211
212
213     /**
214      * Utility function to return the Java specific object type.
215      * 
216      * @param row
217      * @param colName
218      * @param colType
219      * @return
220      */
221     public Object getColValue(Row row, String colName, DataType colType) {
222
223         switch (colType.getName()) {
224             case VARCHAR:
225                 return row.getString(colName);
226             case UUID:
227                 return row.getUUID(colName);
228             case VARINT:
229                 return row.getVarint(colName);
230             case BIGINT:
231                 return row.getLong(colName);
232             case INT:
233                 return row.getInt(colName);
234             case FLOAT:
235                 return row.getFloat(colName);
236             case DOUBLE:
237                 return row.getDouble(colName);
238             case BOOLEAN:
239                 return row.getBool(colName);
240             case MAP:
241                 return row.getMap(colName, String.class, String.class);
242             default:
243                 return null;
244         }
245     }
246
247     public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) {
248         ColumnDefinitions colInfo = row.getColumnDefinitions();
249
250         for (Map.Entry<String, Object> entry : condition.entrySet()) {
251             String colName = entry.getKey();
252             DataType colType = colInfo.getType(colName);
253             Object columnValue = getColValue(row, colName, colType);
254             Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
255             if (columnValue.equals(conditionValue) == false)
256                 return false;
257         }
258         return true;
259     }
260
261     /**
262      * Utility function to store ResultSet values in to a MAP for output.
263      * 
264      * @param results
265      * @return MAP
266      */
267     public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
268         Map<String, HashMap<String, Object>> resultMap =
269                         new HashMap<String, HashMap<String, Object>>();
270         int counter = 0;
271         for (Row row : results) {
272             ColumnDefinitions colInfo = row.getColumnDefinitions();
273             HashMap<String, Object> resultOutput = new HashMap<String, Object>();
274             for (Definition definition : colInfo) {
275                 if (!definition.getName().equals("vector_ts"))
276                     resultOutput.put(definition.getName(),
277                                     getColValue(row, definition.getName(), definition.getType()));
278             }
279             resultMap.put("row " + counter, resultOutput);
280             counter++;
281         }
282         return resultMap;
283     }
284
285
286     // Prepared Statements 1802 additions
287     /**
288      * This Method performs DDL and DML operations on Cassandra using specified consistency level
289      * 
290      * @param queryObject Object containing cassandra prepared query and values.
291      * @param consistency Specify consistency level for data synchronization across cassandra
292      *        replicas
293      * @return Boolean Indicates operation success or failure
294      * @throws MusicServiceException
295      * @throws MusicQueryException
296      */
297     public boolean executePut(PreparedQueryObject queryObject, String consistency)
298                     throws MusicServiceException, MusicQueryException {
299
300         boolean result = false;
301
302         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
303             logger.error(EELFLoggerDelegate.errorLogger,
304                             "Error while processing prepared query object");
305             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
306                             + queryObject.getQuery() + "]");
307         }
308         logger.info(EELFLoggerDelegate.applicationLogger,
309                         "In preprared Execute Put: the actual insert query:"
310                                         + queryObject.getQuery() + "; the values"
311                                         + queryObject.getValues());
312         PreparedStatement preparedInsert = session.prepare(queryObject.getQuery());
313         try {
314             if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
315                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
316                 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
317             } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
318                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
319                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
320             }
321
322             ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
323             result = rs.wasApplied();
324
325         } catch (Exception e) {
326             logger.error(EELFLoggerDelegate.errorLogger, "Executing Session Failure for Request = "
327                             + "[" + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
328             throw new MusicServiceException("Executing Session Failure for Request = " + "["
329                             + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
330         }
331
332
333         return result;
334     }
335
336     /**
337      * This method performs DDL operations on Cassandra using consistency level ONE.
338      * 
339      * @param queryObject Object containing cassandra prepared query and values.
340      * @return ResultSet
341      * @throws MusicServiceException
342      * @throws MusicQueryException
343      */
344     public ResultSet executeEventualGet(PreparedQueryObject queryObject)
345                     throws MusicServiceException, MusicQueryException {
346
347         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
348             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
349                             + queryObject.getQuery() + "]");
350         }
351         logger.info(EELFLoggerDelegate.applicationLogger,
352                         "Executing Eventual  get query:" + queryObject.getQuery());
353         PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
354         preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
355         ResultSet results = null;
356         try {
357             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
358
359         } catch (Exception ex) {
360             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
361             throw new MusicServiceException(ex.getMessage());
362         }
363         return results;
364     }
365
366     /**
367      * 
368      * This method performs DDL operation on Cassandra using consistency level QUORUM.
369      * 
370      * @param queryObject Object containing cassandra prepared query and values.
371      * @return ResultSet
372      * @throws MusicServiceException
373      * @throws MusicQueryException
374      */
375     public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
376                     throws MusicServiceException, MusicQueryException {
377         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
378             logger.error(EELFLoggerDelegate.errorLogger, "Error processing Prepared Query Object");
379             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
380                             + queryObject.getQuery() + "]");
381         }
382         logger.info(EELFLoggerDelegate.applicationLogger,
383                         "Executing Critical get query:" + queryObject.getQuery());
384         PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
385         preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
386         ResultSet results = null;
387         try {
388             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
389         } catch (Exception ex) {
390             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
391             throw new MusicServiceException(ex.getMessage());
392         }
393         return results;
394
395     }
396
397 }