Various bug fixes and code cleanup
[music/mdbc.git] / mdbc-server / src / main / java / org / onap / music / mdbc / StateManager.java
1 /*
2  * ============LICENSE_START====================================================
3  * org.onap.music.mdbc
4  * =============================================================================
5  * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6  * =============================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END======================================================
19  */
20 package org.onap.music.mdbc;
21
22 import java.util.ArrayList;
23 import java.util.List;
24
25 import org.apache.commons.lang3.tuple.Pair;
26 import org.onap.music.exceptions.MDBCServiceException;
27 import org.onap.music.exceptions.QueryException;
28 import org.onap.music.logging.EELFLoggerDelegate;
29 import org.onap.music.logging.format.AppMessages;
30 import org.onap.music.logging.format.ErrorSeverity;
31 import org.onap.music.logging.format.ErrorTypes;
32 import org.onap.music.mdbc.mixins.DBInterface;
33 import org.onap.music.mdbc.mixins.MixinFactory;
34 import org.onap.music.mdbc.mixins.MusicInterface;
35 import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
36 import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
37 import org.onap.music.mdbc.tables.MriReference;
38 import org.onap.music.mdbc.tables.MusicTxDigestDaemon;
39 import org.onap.music.mdbc.tables.MusicTxDigestId;
40 import org.onap.music.mdbc.tables.TxCommitProgress;
41
42 import java.io.IOException;
43 import java.sql.Connection;
44 import java.sql.DriverManager;
45 import java.sql.SQLException;
46 import java.sql.Statement;
47 import java.util.HashMap;
48 import java.util.HashSet;
49 import java.util.Map;
50 import java.util.Properties;
51 import java.util.Set;
52 import java.util.concurrent.ConcurrentHashMap;
53 import java.util.concurrent.locks.Lock;
54 import java.util.concurrent.locks.ReentrantLock;
55
56 /**
57  * \TODO Implement an interface for the server logic and a factory 
58  * @author Enrique Saurez
59  */
60 public class StateManager {
61
62         //\TODO We need to fix the auto-commit mode and multiple transactions with the same connection
63
64         private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(StateManager.class);
65
66         /**
67          * This is the interface used by all the MusicSqlManagers, 
68          * that are created by the MDBC Server 
69          * @see MusicInterface 
70      */
71     private MusicInterface musicInterface;
72     /**
73      * This is the Running Queries information table.
74      * It mainly contains information about the entities 
75      * that have being committed so far.
76      */
77     private TxCommitProgress transactionInfo;
78     private Map<String,MdbcConnection> mdbcConnections;
79     private String sqlDBName;
80     private String sqlDBUrl;
81     
82     String musicmixin;
83     String cassandraUrl;
84     private Properties info;
85     
86     /** Identifier for this server instance */
87     private String mdbcServerName;
88     private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition
89     private final Lock eventualLock  = new ReentrantLock();
90     private Set<Range> eventualRanges;
91     /** lock for warmupRanges */
92     private final Lock warmupLock = new ReentrantLock();
93     /** a set of ranges that should be periodically updated with latest information, if null all tables should be warmed up */
94     private Set<Range> rangesToWarmup;
95     /** map of transactions that have already been applied/updated in this sites SQL db */
96     private Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied;
97     private OwnershipAndCheckpoint ownAndCheck;
98     private Thread txDaemon ;
99
100     /**
101      * For testing purposes only
102      */
103     @Deprecated
104     public StateManager() {
105     }
106     
107         public StateManager(String sqlDBUrl, Properties newInfo, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
108         this.sqlDBName = sqlDBName;
109         this.sqlDBUrl = cleanSqlUrl(sqlDBUrl);
110         this.info = new Properties();
111         this.mdbcServerName = mdbcServerName;
112     
113         this.connectionRanges = new ConcurrentHashMap<>();
114         this.transactionInfo = new TxCommitProgress();
115         //\fixme this might not be used, delete?
116         try {
117                         info.load(this.getClass().getClassLoader().getResourceAsStream("music.properties"));
118                         info.load(this.getClass().getClassLoader().getResourceAsStream("key.properties"));
119                         info.putAll(MDBCUtils.getMdbcProperties());
120                 } catch (IOException e) {
121                         logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
122                 }
123                 info.putAll(newInfo);
124         cassandraUrl = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT);
125         musicmixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT);
126         
127         String writeLocksOnly = info.getProperty(Configuration.KEY_WRITE_LOCKS_ONLY);
128         MDBCUtils.writeLocksOnly = (writeLocksOnly==null) ? Configuration.WRITE_LOCK_ONLY_DEFAULT : Boolean.parseBoolean(writeLocksOnly);
129         
130         initMusic();
131         initSqlDatabase();
132         initTxDaemonThread();
133         String t = info.getProperty(Configuration.KEY_OWNERSHIP_TIMEOUT);
134         long timeout = (t == null) ? Configuration.DEFAULT_OWNERSHIP_TIMEOUT : Integer.parseInt(t);
135         alreadyApplied = new ConcurrentHashMap<>();
136         ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout);
137     }
138
139     protected String cleanSqlUrl(String url){
140             if(url!=null) {
141             url = url.trim();
142             if (url.length() > 0 && url.charAt(url.length() - 1) == '/') {
143                 url= url.substring(0, url.length() - 1);
144             }
145         }
146         return url;
147     }
148
149     protected void initTxDaemonThread(){
150         txDaemon = new Thread(
151             new MusicTxDigestDaemon(Integer.parseInt(
152                 info.getProperty(Configuration.TX_DAEMON_SLEEPTIME_S, Configuration.TX_DAEMON_SLEEPTIME_S_DEFAULT)),
153                 this));
154         txDaemon.setName("TxDaemon");
155         txDaemon.setDaemon(true);
156         txDaemon.start();
157     }
158
159     /**
160      * Initialize all the  interfaces and datastructures
161      * @throws MDBCServiceException
162      */
163     protected void initMusic() throws MDBCServiceException {
164         this.musicInterface = MixinFactory.createMusicInterface(this, musicmixin, mdbcServerName, info);
165         this.mdbcConnections = new HashMap<>();
166     }
167     
168     protected void initSqlDatabase() throws MDBCServiceException {
169         if(!this.sqlDBUrl.toLowerCase().startsWith("jdbc:postgresql")) {
170             try {
171                 Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info);
172                 StringBuilder sql = new StringBuilder("CREATE DATABASE IF NOT EXISTS ")
173                     .append(sqlDBName)
174                     .append(";");
175                 Statement stmt = sqlConnection.createStatement();
176                 stmt.execute(sql.toString());
177                 sqlConnection.close();
178             } catch (SQLException e) {
179                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.UNKNOWNERROR,
180                     ErrorSeverity.CRITICAL,
181                     ErrorTypes.GENERALSERVICEERROR);
182                 throw new MDBCServiceException(e.getMessage(), e);
183             }
184         }
185         
186         // Verify the tables in MUSIC match the tables in the database
187         // and create triggers on any tables that need them
188         try {
189             MdbcConnection mdbcConn = (MdbcConnection) openConnection("init");
190             mdbcConn.initDatabase();
191             closeConnection("init");
192         } catch (QueryException e) {
193             logger.error("Error syncrhonizing tables");
194             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
195         }
196     }
197     
198     /**
199      * Get list of ranges to warmup from configuration file
200      * if no configuration is provided, will return null
201      * @return
202      */
203     private Set<Range> initWarmupRanges() {
204         String warmupString = info.getProperty(Configuration.KEY_WARMUPRANGES);
205         if (warmupString==null) {
206             return null;
207         }
208         Set<Range> warmupRanges = new HashSet<>();
209         String[] ranges = warmupString.split(",");
210         for (String range: ranges) {
211             warmupRanges.add(new Range(range.trim()));
212         }
213         return warmupRanges;
214     }
215
216     public MusicInterface getMusicInterface() {
217         return this.musicInterface;
218     }
219     
220     public List<DatabasePartition> getPartitions() {
221         return new ArrayList<>(connectionRanges.values());
222         }
223
224     /**
225      * Get a list of ranges that are to be periodically warmed up
226      * 
227      * If no list is specified, all ranges except eventual consistency ranges are returned
228      * @return
229      */
230         public Set<Range> getRangesToWarmup() {
231         warmupLock.lock();
232         Set<Range> returnSet;
233         try {
234             if(rangesToWarmup!=null) {
235                 returnSet = rangesToWarmup;
236             }
237             else {
238                 returnSet = getAllRanges();
239                 for (Range eventualRange: eventualRanges) {
240                     returnSet.remove(eventualRange);
241                 }
242             }
243         }
244         finally{
245            warmupLock.unlock();
246         }
247         return returnSet;
248     }
249
250         /**
251          * Get a set of all ranges seen in the sql db
252          * @return
253          */
254         private Set<Range> getAllRanges() {
255             DBInterface dbi = ((MdbcConnection) getConnection("daemon")).getDBInterface();
256             return dbi.getSQLRangeSet();
257     }
258
259     /**
260          * Get a list of ranges that are eventually consistent
261          * @return
262          */
263     public Set<Range> getEventualRanges() {
264         eventualLock.lock();
265         Set<Range> returnSet;
266         try {
267             if(eventualRanges!=null){
268                 returnSet = new HashSet<>(eventualRanges);
269             }
270             else{
271                 returnSet= new HashSet<>();
272             }
273         }
274         finally{
275             eventualLock.unlock();
276         }
277         return returnSet;
278     }
279
280     public void setEventualRanges(Set<Range> eventualRanges) {
281         eventualLock.lock();
282         try {
283             this.eventualRanges = eventualRanges;
284         }
285         finally{
286             eventualLock.unlock();
287         }
288     }
289     
290     public String getMdbcServerName() {
291         return mdbcServerName;
292     }
293
294     public void setMdbcServerName(String mdbcServerName) {
295         this.mdbcServerName = mdbcServerName;
296     }
297
298     /**
299      * Close connection and relinquish any locks held for that connection
300      * @param connectionId
301      */
302     public void closeConnection(String connectionId){
303         //\TODO check if there is a race condition
304         if(mdbcConnections.containsKey(connectionId)) {
305             transactionInfo.deleteTxProgress(connectionId);
306             try {
307                 Connection conn = mdbcConnections.get(connectionId);
308                 if (conn!=null && !conn.isClosed()) {
309                     conn.close();
310                 }
311             } catch (SQLException e) {
312                 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
313                         ErrorTypes.GENERALSERVICEERROR);
314             }
315             mdbcConnections.remove(connectionId);
316         }
317         
318         connectionRanges.remove(connectionId);
319         
320     }
321
322     /**
323      * Opens a connection into database, setting up all necessary triggers, etc
324      * @param id UUID of a connection
325      */
326         public Connection openConnection(String id) {
327                 Connection sqlConnection;
328         MdbcConnection newConnection;
329         Utils.registerDefaultDrivers();
330         //Create connection to local SQL DB
331
332                 try {
333                         sqlConnection = DriverManager.getConnection(this.sqlDBUrl+"/"+this.sqlDBName, this.info);
334                 } catch (SQLException e) {
335                     logger.error("sql connection was not created correctly");
336                         logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL,
337                     ErrorTypes.QUERYERROR);
338                         sqlConnection = null;
339                 }
340                 
341                 //TODO: later we could try to match it to some more sticky client id
342         DatabasePartition ranges=new DatabasePartition(musicInterface.generateUniqueKey());
343         connectionRanges.put(id,ranges);
344         
345         //Create MDBC connection
346         try {
347                         newConnection = new MdbcConnection(id,this.sqlDBUrl+"/"+this.sqlDBName, sqlConnection, info, this.musicInterface,
348                 transactionInfo,ranges, this);
349                 } catch (MDBCServiceException e) {
350                         logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
351                     ErrorTypes.QUERYERROR);
352                         newConnection = null;
353                 }
354                 logger.info(EELFLoggerDelegate.applicationLogger,"Connection created for connection: "+id);
355
356         transactionInfo.createNewTransactionTracker(id, sqlConnection);
357         if(newConnection != null) {
358             mdbcConnections.put(id,newConnection);
359         }
360         return newConnection;
361         }
362     
363     
364     /**
365      * This function returns the connection to the corresponding transaction 
366      * @param id of the transaction, created using
367      * @return
368      */
369     public Connection getConnection(String id) {
370         if(mdbcConnections.containsKey(id)) {
371             //\TODO: Verify if this make sense
372             // Intent: reinitialize transaction progress, when it already completed the previous tx for the same connection
373             if(transactionInfo.isComplete(id)) {
374                 transactionInfo.reinitializeTxProgress(id);
375             }
376             return mdbcConnections.get(id);
377         }
378
379         return openConnection(id);
380     }
381     
382         public void initializeSystem() {
383                 //\TODO Prefetch data to system using the data ranges as guide 
384                 throw new UnsupportedOperationException("Function initialize system needs to be implemented id MdbcStateManager");
385         }
386
387         private void relinquish(DatabasePartition partition){
388         try {
389             musicInterface.relinquish(partition);
390         } catch (MDBCServiceException e) {
391             logger.error("Relinquish failed, would need to forcefully obtain lock later");
392         }
393
394     }
395
396     public void setWarmupRanges(Set<Range> warmupRanges) {
397         warmupLock.lock();
398         try {
399             this.rangesToWarmup = warmupRanges;
400         }
401         finally{
402             warmupLock.unlock();
403         }
404     }
405     
406     public OwnershipAndCheckpoint getOwnAndCheck() {
407         return ownAndCheck;
408     }
409
410     /**
411      * Close all connections for this server, relinquishing any locks/partitions owned by this server
412      */
413     public void releaseAllPartitions() {
414         for(String connection: this.mdbcConnections.keySet()) {
415             closeConnection(connection);
416         } 
417     }
418 }