Merge "Bug fixes, syncronization, and clean up daemon"
[music.git] / music-core / src / main / java / org / onap / music / datastore / MusicDataStore.java
index 5a65868..cb22c0f 100755 (executable)
 
 package org.onap.music.datastore;
 
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Enumeration;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
 import org.onap.music.eelf.logging.EELFLoggerDelegate;
@@ -57,13 +51,11 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.SocketOptions;
 import com.datastax.driver.core.TableMetadata;
-import com.datastax.driver.core.TypeCodec;
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
-import com.datastax.driver.extras.codecs.enums.EnumOrdinalCodec;
 
 /**
  * @author nelson24
@@ -73,10 +65,33 @@ public class MusicDataStore {
 
     public static final String CONSISTENCY_LEVEL_ONE = "ONE";
     public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
+    public static final String CONSISTENCY_LEVEL_LOCAL_QUORUM = "LOCAL_QUORUM";
     private Session session;
     private Cluster cluster;
 
 
+    /**
+     * Connect to default Cassandra address
+     */
+    public MusicDataStore() {
+        try {
+            connectToCassaCluster(MusicUtil.getMyCassaHost());
+        } catch (MusicServiceException e) {
+            logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
+        }
+    }
+
+
+    /**
+     * @param cluster
+     * @param session
+     */
+    public MusicDataStore(Cluster cluster, Session session) {
+        this.session = session;
+        setCluster(cluster);
+    }
+
+
     /**
      * @param session
      */
@@ -95,9 +110,12 @@ public class MusicDataStore {
      * @param cluster
      */
     public void setCluster(Cluster cluster) {
+        EnumNameCodec<LockType> lockTypeCodec = new EnumNameCodec<LockType>(LockType.class);
+        cluster.getConfiguration().getCodecRegistry().register(lockTypeCodec);
+
         this.cluster = cluster;
     }
-    
+
     public Cluster getCluster() {
         return this.cluster;
     }
@@ -105,26 +123,6 @@ public class MusicDataStore {
 
     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
 
-    /**
-     * Connect to default Cassandra address
-     */
-    public MusicDataStore() {
-        try {
-            connectToCassaCluster(MusicUtil.getMyCassaHost());
-        } catch (MusicServiceException e) {
-            logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
-        }
-    }
-
-
-    /**
-     * @param cluster
-     * @param session
-     */
-    public MusicDataStore(Cluster cluster, Session session) {
-        this.session = session;
-        this.cluster = cluster;
-    }
 
     /**
      *
@@ -158,40 +156,44 @@ public class MusicDataStore {
         poolingOptions
         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
-        
+
+        Cluster cluster;
         if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
             String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd());
             logger.info(EELFLoggerDelegate.applicationLogger,
                     "Building with credentials "+MusicUtil.getCassName()+" & "+ MusicUtil.getCassPwd());
             cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
-                        .withCredentials(MusicUtil.getCassName(), cassPwd)
-                        //.withLoadBalancingPolicy(new RoundRobinPolicy())
-                        .withoutJMXReporting()
-                        .withPoolingOptions(poolingOptions)
-                        .addContactPoints(addresses).build();
+                    .withCredentials(MusicUtil.getCassName(), cassPwd)
+                    //.withLoadBalancingPolicy(new RoundRobinPolicy())
+                    .withoutJMXReporting()
+                    .withPoolingOptions(poolingOptions)
+                    .withSocketOptions(
+                            new SocketOptions().setConnectTimeoutMillis(MusicUtil.getCassandraConnectTimeOutMS())
+                            .setReadTimeoutMillis(MusicUtil.getCassandraReadTimeOutMS()))
+                    .addContactPoints(addresses).build();
         } else {
             cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
-                        .withoutJMXReporting()
-                        .withPoolingOptions(poolingOptions)
-                        .addContactPoints(addresses)
-                        .build();
+                    .withoutJMXReporting()
+                    .withPoolingOptions(poolingOptions)
+                    .withSocketOptions(new SocketOptions()
+                            .setConnectTimeoutMillis(MusicUtil.getCassandraConnectTimeOutMS())
+                            .setReadTimeoutMillis(MusicUtil.getCassandraReadTimeOutMS()))
+                    .addContactPoints(addresses)
+                    .build();
         }
-        
-        
-        Metadata metadata = cluster.getMetadata();
+
+        this.setCluster(cluster);
+        Metadata metadata = this.cluster.getMetadata();
         logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
-                        + metadata.getClusterName() + " at " + address);
-        
-        EnumNameCodec<LockType> lockTypeCodec = new EnumNameCodec<LockType>(LockType.class);
-        cluster.getConfiguration().getCodecRegistry().register(lockTypeCodec);
+                + metadata.getClusterName() + " at " + address);
 
         try {
-            session = cluster.connect();
+            session = this.cluster.connect();
         } catch (Exception ex) {
             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY,
-                ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE, ex);
+                    ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE, ex);
             throw new MusicServiceException(
-                            "Error while connecting to Cassandra cluster.. " + ex.getMessage());
+                    "Error while connecting to Cassandra cluster.. " + ex.getMessage());
         }
     }
 
@@ -219,16 +221,16 @@ public class MusicDataStore {
         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
         return ks.getTable(tableName);
     }
-    
+
     /**
-    *
-    * @param keyspace
-    * @param tableName
-    * @return TableMetadata
-    */
-   public KeyspaceMetadata returnKeyspaceMetadata(String keyspace) {
-       return cluster.getMetadata().getKeyspace(keyspace);
-   }
+     *
+     * @param keyspace
+     * @param tableName
+     * @return TableMetadata
+     */
+    public KeyspaceMetadata returnKeyspaceMetadata(String keyspace) {
+        return cluster.getMetadata().getKeyspace(keyspace);
+    }
 
 
     /**
@@ -294,7 +296,7 @@ public class MusicDataStore {
      */
     public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
         Map<String, HashMap<String, Object>> resultMap =
-                        new HashMap<>();
+                new HashMap<>();
         int counter = 0;
         for (Row row : results) {
             ColumnDefinitions colInfo = row.getColumnDefinitions();
@@ -306,7 +308,7 @@ public class MusicDataStore {
                                 getBlobValue(row, definition.getName(), definition.getType()));
                     } else {
                         resultOutput.put(definition.getName(),
-                                    getColValue(row, definition.getName(), definition.getType()));
+                                getColValue(row, definition.getName(), definition.getType()));
                     }
                 }
             }
@@ -318,7 +320,7 @@ public class MusicDataStore {
 
 
     // Prepared Statements 1802 additions
-    
+
     public boolean executePut(PreparedQueryObject queryObject, String consistency)
             throws MusicServiceException, MusicQueryException {
         return executePut(queryObject, consistency, 0);
@@ -334,19 +336,19 @@ public class MusicDataStore {
      * @throws MusicQueryException
      */
     public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot)
-                    throws MusicServiceException, MusicQueryException {
+            throws MusicServiceException, MusicQueryException {
 
         boolean result = false;
         long timeOfWrite = System.currentTimeMillis();
         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
             logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
-                            + queryObject.getQuery() + "]");
+                    + queryObject.getQuery() + "]");
         }
         logger.debug(EELFLoggerDelegate.applicationLogger,
-                        "In preprared Execute Put: the actual insert query:"
-                                        + queryObject.getQuery() + "; the values"
-                                        + queryObject.getValues());
+                "In preprared Execute Put: the actual insert query:"
+                        + queryObject.getQuery() + "; the values"
+                        + queryObject.getValues());
         SimpleStatement preparedInsert = null;
 
         try {
@@ -372,26 +374,29 @@ public class MusicDataStore {
 
             ResultSet rs = session.execute(preparedInsert);
             result = rs.wasApplied();
-
-        }
-        catch (AlreadyExistsException ae) {
-            // logger.error(EELFLoggerDelegate.errorLogger,"AlreadExistsException: " + ae.getMessage(),AppMessages.QUERYERROR,
-            // ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
-            throw new MusicQueryException("AlreadyExistsException: " + ae.getMessage(),ae);
-        } catch ( InvalidQueryException e ) {
-            // logger.error(EELFLoggerDelegate.errorLogger,"InvalidQueryException: " + e.getMessage(),AppMessages.SESSIONFAILED + " [" 
-            // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
-            throw new MusicQueryException("InvalidQueryException: " + e.getMessage(),e);
+        } catch (AlreadyExistsException ae) {
+            throw new MusicServiceException("Already Exists Exception: " + ae.getMessage());
+        } catch (InvalidQueryException e) {
+            if (e.getMessage().contains("unconfigured table")) {
+                throw new MusicServiceException("Invalid Query Exception: " + e.getMessage());
+            } else {
+                logger.info(EELFLoggerDelegate.applicationLogger, "Query Exception: " + e.getMessage(),
+                        AppMessages.SESSIONFAILED + " [" + queryObject.getQuery() + "]", ErrorSeverity.INFO,
+                        ErrorTypes.QUERYERROR, e);
+                throw new MusicServiceException("Query Exception: " + e.getMessage());
+            }
         } catch (Exception e) {
-            // logger.error(EELFLoggerDelegate.errorLogger,e.getClass().toString() + ":" + e.getMessage(),AppMessages.SESSIONFAILED + " [" 
-            //     + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e);
-            throw new MusicServiceException("Executing Session Failure for Request = " + "["
-                + queryObject.getQuery() + "]" + " Reason = " + e.getMessage(),e);
+            logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),
+                    AppMessages.SESSIONFAILED + " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR,
+                    ErrorTypes.QUERYERROR, e);
+            throw new MusicServiceException("Executing Session Failure for Request = " + "[" + queryObject.getQuery()
+                    + "]" + " Reason = " + e.getMessage());
         }
+
         return result;
     }
 
- /*   *//**
   /*   *//**
      * This method performs DDL operations on Cassandra using consistency level ONE.
      *
      * @param queryObject Object containing cassandra prepared query and values.
@@ -434,15 +439,15 @@ public class MusicDataStore {
         return results;
     }
 
-    *//**
-     *
-     * This method performs DDL operation on Cassandra using consistency level QUORUM.
-     *
-     * @param queryObject Object containing cassandra prepared query and values.
-     * @return ResultSet
-     * @throws MusicServiceException
-     * @throws MusicQueryException
-     *//*
+      *//**
+      *
+      * This method performs DDL operation on Cassandra using consistency level QUORUM.
+      *
+      * @param queryObject Object containing cassandra prepared query and values.
+      * @return ResultSet
+      * @throws MusicServiceException
+      * @throws MusicQueryException
+      *//*
     public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
                     throws MusicServiceException, MusicQueryException {
         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
@@ -465,50 +470,57 @@ public class MusicDataStore {
         return results;
 
     }
-    */
+       */
     public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException {
         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
-                            + queryObject.getQuery() + "]");
+                    + queryObject.getQuery() + "]");
         }
         ResultSet results = null;
         try {
             SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
-
             if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) {
-                if(queryObject.getConsistency() == null) {
                     statement.setConsistencyLevel(ConsistencyLevel.ONE);
-                } else {
-                    statement.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
-                }
-            }
-            else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
+            } else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
                 statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
+            } else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_LOCAL_QUORUM)) {
+                statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
             }
 
             results = session.execute(statement);
 
         } catch (Exception ex) {
             logger.error(EELFLoggerDelegate.errorLogger, "Execute Get Error" + ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject
-                .getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ex);
+                    .getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ex);
             throw new MusicServiceException("Execute Get Error" + ex.getMessage());
         }
-        
+
         return results;
-        
+
     }
-    
+
     /**
      * This method performs DDL operations on Cassandra using consistency level ONE.
      * 
      * @param queryObject Object containing cassandra prepared query and values.
      */
     public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
-                    throws MusicServiceException, MusicQueryException {
+            throws MusicServiceException, MusicQueryException {
         return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
     }
 
+    /**
+     * 
+     * This method performs DDL operation on Cassandra using consistency level LOCAL_QUORUM.
+     * 
+     * @param queryObject Object containing cassandra prepared query and values.
+     */
+    public ResultSet executeLocalQuorumConsistencyGet(PreparedQueryObject queryObject)
+            throws MusicServiceException, MusicQueryException {
+        return executeGet(queryObject, CONSISTENCY_LEVEL_LOCAL_QUORUM);
+    }
+
     /**
      * 
      * This method performs DDL operation on Cassandra using consistency level QUORUM.
@@ -516,7 +528,7 @@ public class MusicDataStore {
      * @param queryObject Object containing cassandra prepared query and values.
      */
     public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
-                    throws MusicServiceException, MusicQueryException {
+            throws MusicServiceException, MusicQueryException {
         return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);
     }