Merge "Add distribution management to pom"
[music.git] / src / main / java / org / onap / music / datastore / MusicDataStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  * 
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  * 
19  * ============LICENSE_END=============================================
20  * ====================================================================
21  */
22 package org.onap.music.datastore;
23
24 import java.net.InetAddress;
25 import java.net.NetworkInterface;
26 import java.net.SocketException;
27 import java.nio.ByteBuffer;
28 import java.util.ArrayList;
29 import java.util.Enumeration;
30 import java.util.HashMap;
31 import java.util.Iterator;
32 import java.util.Map;
33 import org.onap.music.eelf.logging.EELFLoggerDelegate;
34 import org.onap.music.eelf.logging.format.AppMessages;
35 import org.onap.music.eelf.logging.format.ErrorSeverity;
36 import org.onap.music.eelf.logging.format.ErrorTypes;
37 import org.onap.music.exceptions.MusicQueryException;
38 import org.onap.music.exceptions.MusicServiceException;
39 import org.onap.music.main.MusicUtil;
40 import com.datastax.driver.core.Cluster;
41 import com.datastax.driver.core.ColumnDefinitions;
42 import com.datastax.driver.core.ColumnDefinitions.Definition;
43 import com.datastax.driver.core.ConsistencyLevel;
44 import com.datastax.driver.core.DataType;
45 import com.datastax.driver.core.HostDistance;
46 import com.datastax.driver.core.KeyspaceMetadata;
47 import com.datastax.driver.core.Metadata;
48 import com.datastax.driver.core.PoolingOptions;
49 import com.datastax.driver.core.PreparedStatement;
50 import com.datastax.driver.core.ResultSet;
51 import com.datastax.driver.core.Row;
52 import com.datastax.driver.core.Session;
53 import com.datastax.driver.core.TableMetadata;
54 import com.datastax.driver.core.exceptions.AlreadyExistsException;
55 import com.datastax.driver.core.exceptions.InvalidQueryException;
56 import com.datastax.driver.core.exceptions.NoHostAvailableException;
57 import com.sun.jersey.core.util.Base64;
58
59 /**
60  * @author nelson24
61  *
62  */
63 public class MusicDataStore {
64
65     private Session session;
66     private Cluster cluster;
67
68
69
70     /**
71      * @param session
72      */
73     public void setSession(Session session) {
74         this.session = session;
75     }
76     
77     /**
78      * @param session
79      */
80     public Session getSession() {
81         return session;
82     }
83
84     /**
85      * @param cluster
86      */
87     public void setCluster(Cluster cluster) {
88         this.cluster = cluster;
89     }
90
91
92
93     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
94
95     /**
96      * 
97      */
98     public MusicDataStore() {
99         connectToCassaCluster();
100     }
101
102
103     /**
104      * @param cluster
105      * @param session
106      */
107     public MusicDataStore(Cluster cluster, Session session) {
108         this.session = session;
109         this.cluster = cluster;
110     }
111
112     /**
113      * 
114      * @param remoteIp
115      * @throws MusicServiceException
116      */
117     public MusicDataStore(String remoteIp) {
118         try {
119             connectToCassaCluster(remoteIp);
120         } catch (MusicServiceException e) {
121             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
122         }
123     }
124
125     /**
126      * 
127      * @return
128      */
129     private ArrayList<String> getAllPossibleLocalIps() {
130         ArrayList<String> allPossibleIps = new ArrayList<String>();
131         try {
132             Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
133             while (en.hasMoreElements()) {
134                 NetworkInterface ni = (NetworkInterface) en.nextElement();
135                 Enumeration<InetAddress> ee = ni.getInetAddresses();
136                 while (ee.hasMoreElements()) {
137                     InetAddress ia = (InetAddress) ee.nextElement();
138                     allPossibleIps.add(ia.getHostAddress());
139                 }
140             }
141         } catch (SocketException e) {
142             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
143         }catch(Exception e) {
144                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
145         }
146         return allPossibleIps;
147     }
148
149     /**
150      * This method iterates through all available IP addresses and connects to multiple cassandra
151      * clusters.
152      */
153     private void connectToCassaCluster() {
154         Iterator<String> it = getAllPossibleLocalIps().iterator();
155         String address = "localhost";
156         String[] addresses = null;
157         address = MusicUtil.getMyCassaHost();
158                 addresses = address.split(",");
159                 
160         logger.info(EELFLoggerDelegate.applicationLogger,
161                         "Connecting to cassa cluster: Iterating through possible ips:"
162                                         + getAllPossibleLocalIps());
163         PoolingOptions poolingOptions = new PoolingOptions();
164         poolingOptions
165         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
166         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
167         while (it.hasNext()) {
168             try {
169                 if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
170                         logger.info(EELFLoggerDelegate.applicationLogger,
171                                         "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
172                         cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
173                                            .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
174                                            //.withLoadBalancingPolicy(new RoundRobinPolicy())
175                                            .withPoolingOptions(poolingOptions)
176                                            .addContactPoints(addresses).build();
177                 }
178                 else
179                         cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
180                                                                 //.withLoadBalancingPolicy(new RoundRobinPolicy())
181                                                                 .addContactPoints(addresses).build();
182                 
183                 Metadata metadata = cluster.getMetadata();
184                 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
185                                 + metadata.getClusterName() + " at " + address);
186                 session = cluster.connect();
187
188                 break;
189             } catch (NoHostAvailableException e) {
190                 address = it.next();
191                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
192             }
193         }
194     }
195
196     /**
197      * 
198      */
199     public void close() {
200         session.close();
201     }
202
203     /**
204      * This method connects to cassandra cluster on specific address.
205      * 
206      * @param address
207      */
208     private void connectToCassaCluster(String address) throws MusicServiceException {
209         String[] addresses = null;
210                 addresses = address.split(",");
211                 PoolingOptions poolingOptions = new PoolingOptions();
212         poolingOptions
213         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
214         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
215         if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
216                 logger.info(EELFLoggerDelegate.applicationLogger,
217                                 "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
218                 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
219                            .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
220                            //.withLoadBalancingPolicy(new RoundRobinPolicy())
221                            .withPoolingOptions(poolingOptions)
222                            .addContactPoints(addresses).build();
223         }
224         else {
225                 cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
226                                         //.withLoadBalancingPolicy(new RoundRobinPolicy())
227                                         .withPoolingOptions(poolingOptions)
228                                         .addContactPoints(addresses).build();
229         }
230         Metadata metadata = cluster.getMetadata();
231         logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
232                         + metadata.getClusterName() + " at " + address);
233         try {
234             session = cluster.connect();
235         } catch (Exception ex) {
236             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY, ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE);
237             throw new MusicServiceException(
238                             "Error while connecting to Cassandra cluster.. " + ex.getMessage());
239         }
240     }
241
242     /**
243      * 
244      * @param keyspace
245      * @param tableName
246      * @param columnName
247      * @return DataType
248      */
249     public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
250         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
251         TableMetadata table = ks.getTable(tableName);
252         return table.getColumn(columnName).getType();
253
254     }
255
256     /**
257      * 
258      * @param keyspace
259      * @param tableName
260      * @return TableMetadata
261      */
262     public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
263         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
264         return ks.getTable(tableName);
265     }
266
267
268     /**
269      * Utility function to return the Java specific object type.
270      * 
271      * @param row
272      * @param colName
273      * @param colType
274      * @return
275      */
276     public Object getColValue(Row row, String colName, DataType colType) {
277
278         switch (colType.getName()) {
279             case VARCHAR:
280                 return row.getString(colName);
281             case UUID:
282                 return row.getUUID(colName);
283             case VARINT:
284                 return row.getVarint(colName);
285             case BIGINT:
286                 return row.getLong(colName);
287             case INT:
288                 return row.getInt(colName);
289             case FLOAT:
290                 return row.getFloat(colName);
291             case DOUBLE:
292                 return row.getDouble(colName);
293             case BOOLEAN:
294                 return row.getBool(colName);
295             case MAP:
296                 return row.getMap(colName, String.class, String.class);
297             case LIST:
298                 return row.getList(colName, String.class);
299             default:
300                 return null;
301         }
302     }
303     
304     public byte[] getBlobValue(Row row, String colName, DataType colType) {
305         ByteBuffer bb = row.getBytes(colName);
306         byte[] data = bb.array();
307         return data;
308     }
309
310     public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
311         ColumnDefinitions colInfo = row.getColumnDefinitions();
312
313         for (Map.Entry<String, Object> entry : condition.entrySet()) {
314             String colName = entry.getKey();
315             DataType colType = colInfo.getType(colName);
316             Object columnValue = getColValue(row, colName, colType);
317             Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
318             if (columnValue.equals(conditionValue))
319                 return false;
320         }
321         return true;
322     }
323
324     /**
325      * Utility function to store ResultSet values in to a MAP for output.
326      * 
327      * @param results
328      * @return MAP
329      */
330     public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
331         Map<String, HashMap<String, Object>> resultMap =
332                         new HashMap<String, HashMap<String, Object>>();
333         int counter = 0;
334         for (Row row : results) {
335             ColumnDefinitions colInfo = row.getColumnDefinitions();
336             HashMap<String, Object> resultOutput = new HashMap<String, Object>();
337             for (Definition definition : colInfo) {
338                 if (!"vector_ts".equals(definition.getName())) {
339                         if(definition.getType().toString().toLowerCase().contains("blob")) {
340                                 resultOutput.put(definition.getName(),
341                                 getBlobValue(row, definition.getName(), definition.getType()));
342                         } 
343                         else
344                                 resultOutput.put(definition.getName(),
345                                     getColValue(row, definition.getName(), definition.getType()));
346                 }
347             }
348             resultMap.put("row " + counter, resultOutput);
349             counter++;
350         }
351         return resultMap;
352     }
353
354
355     // Prepared Statements 1802 additions
356     /**
357      * This Method performs DDL and DML operations on Cassandra using specified consistency level
358      * 
359      * @param queryObject Object containing cassandra prepared query and values.
360      * @param consistency Specify consistency level for data synchronization across cassandra
361      *        replicas
362      * @return Boolean Indicates operation success or failure
363      * @throws MusicServiceException
364      * @throws MusicQueryException
365      */
366     public boolean executePut(PreparedQueryObject queryObject, String consistency)
367                     throws MusicServiceException, MusicQueryException {
368
369         boolean result = false;
370
371         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
372                 logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
373             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
374                             + queryObject.getQuery() + "]");
375         }
376         logger.info(EELFLoggerDelegate.applicationLogger,
377                         "In preprared Execute Put: the actual insert query:"
378                                         + queryObject.getQuery() + "; the values"
379                                         + queryObject.getValues());
380         PreparedStatement preparedInsert = null;
381         try {
382                 
383                                 preparedInsert = session.prepare(queryObject.getQuery());
384                         
385         } catch(InvalidQueryException iqe) {
386                 logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
387                 throw new MusicQueryException(iqe.getMessage());
388         }catch(Exception e) {
389                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
390                 throw new MusicQueryException(e.getMessage());
391         }
392         
393         try {
394             if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
395                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
396                 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
397             } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
398                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
399                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
400             }
401
402             ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
403             result = rs.wasApplied();
404
405         }
406         catch (AlreadyExistsException ae) {
407             logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
408                 throw new MusicServiceException(ae.getMessage());
409         }
410         catch (Exception e) {
411                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
412                 throw new MusicQueryException("Executing Session Failure for Request = " + "["
413                             + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
414         }
415
416
417         return result;
418     }
419
420     /**
421      * This method performs DDL operations on Cassandra using consistency level ONE.
422      * 
423      * @param queryObject Object containing cassandra prepared query and values.
424      * @return ResultSet
425      * @throws MusicServiceException
426      * @throws MusicQueryException
427      */
428     public ResultSet executeEventualGet(PreparedQueryObject queryObject)
429                     throws MusicServiceException, MusicQueryException {
430
431         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
432                 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
433                 throw new MusicQueryException("Ill formed queryObject for the request = " + "["
434                             + queryObject.getQuery() + "]");
435         }
436         logger.info(EELFLoggerDelegate.applicationLogger,
437                         "Executing Eventual  get query:" + queryObject.getQuery());
438        
439         ResultSet results = null;
440         try {
441                  PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
442              preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
443              results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
444
445         } catch (Exception ex) {
446                 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
447                 throw new MusicServiceException(ex.getMessage());
448         }
449         return results;
450     }
451
452     /**
453      * 
454      * This method performs DDL operation on Cassandra using consistency level QUORUM.
455      * 
456      * @param queryObject Object containing cassandra prepared query and values.
457      * @return ResultSet
458      * @throws MusicServiceException
459      * @throws MusicQueryException
460      */
461     public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
462                     throws MusicServiceException, MusicQueryException {
463         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
464                 logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
465             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
466                             + queryObject.getQuery() + "]");
467         }
468         logger.info(EELFLoggerDelegate.applicationLogger,
469                         "Executing Critical get query:" + queryObject.getQuery());
470         PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
471         preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
472         ResultSet results = null;
473         try {
474             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
475         } catch (Exception ex) {
476                 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
477                 throw new MusicServiceException(ex.getMessage());
478         }
479         return results;
480
481     }
482
483 }