2 * ============LICENSE_START====================================================
4 * =============================================================================
5 * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6 * =============================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END======================================================
20 package org.onap.music.mdbc;
22 import java.util.ArrayList;
23 import java.util.List;
25 import org.apache.commons.lang3.tuple.Pair;
26 import org.onap.music.exceptions.MDBCServiceException;
27 import org.onap.music.exceptions.QueryException;
28 import org.onap.music.logging.EELFLoggerDelegate;
29 import org.onap.music.logging.format.AppMessages;
30 import org.onap.music.logging.format.ErrorSeverity;
31 import org.onap.music.logging.format.ErrorTypes;
32 import org.onap.music.mdbc.mixins.DBInterface;
33 import org.onap.music.mdbc.mixins.MixinFactory;
34 import org.onap.music.mdbc.mixins.MusicInterface;
35 import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
36 import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
37 import org.onap.music.mdbc.tables.MriReference;
38 import org.onap.music.mdbc.tables.MusicTxDigestDaemon;
39 import org.onap.music.mdbc.tables.MusicTxDigestId;
40 import org.onap.music.mdbc.tables.TxCommitProgress;
42 import java.io.IOException;
43 import java.sql.Connection;
44 import java.sql.DriverManager;
45 import java.sql.SQLException;
46 import java.sql.Statement;
47 import java.util.HashMap;
48 import java.util.HashSet;
50 import java.util.Properties;
52 import java.util.concurrent.ConcurrentHashMap;
53 import java.util.concurrent.locks.Lock;
54 import java.util.concurrent.locks.ReentrantLock;
57 * \TODO Implement an interface for the server logic and a factory
58 * @author Enrique Saurez
60 public class StateManager {
62 //\TODO We need to fix the auto-commit mode and multiple transactions with the same connection
64 private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(StateManager.class);
67 * This is the interface used by all the MusicSqlManagers,
68 * that are created by the MDBC Server
71 private MusicInterface musicInterface;
73 * This is the Running Queries information table.
74 * It mainly contains information about the entities
75 * that have being committed so far.
77 private TxCommitProgress transactionInfo;
78 private Map<String,MdbcConnection> mdbcConnections;
79 private String sqlDBName;
80 private String sqlDBUrl;
84 private Properties info;
86 /** Identifier for this server instance */
87 private String mdbcServerName;
88 private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition
89 private final Lock eventualLock = new ReentrantLock();
90 private Set<Range> eventualRanges;
91 /** lock for warmupRanges */
92 private final Lock warmupLock = new ReentrantLock();
93 /** a set of ranges that should be periodically updated with latest information, if null all tables should be warmed up */
94 private Set<Range> rangesToWarmup;
95 /** map of transactions that have already been applied/updated in this sites SQL db */
96 private Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied;
97 private OwnershipAndCheckpoint ownAndCheck;
98 private Thread txDaemon ;
101 * For testing purposes only
104 public StateManager() {
107 public StateManager(String sqlDBUrl, Properties newInfo, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
108 this.sqlDBName = sqlDBName;
109 this.sqlDBUrl = cleanSqlUrl(sqlDBUrl);
110 this.info = new Properties();
111 this.mdbcServerName = mdbcServerName;
113 this.connectionRanges = new ConcurrentHashMap<>();
114 this.transactionInfo = new TxCommitProgress();
115 //\fixme this might not be used, delete?
117 info.load(this.getClass().getClassLoader().getResourceAsStream("music.properties"));
118 info.load(this.getClass().getClassLoader().getResourceAsStream("key.properties"));
119 info.putAll(MDBCUtils.getMdbcProperties());
120 } catch (IOException e) {
121 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
123 info.putAll(newInfo);
124 cassandraUrl = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT);
125 musicmixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT);
127 String writeLocksOnly = info.getProperty(Configuration.KEY_WRITE_LOCKS_ONLY);
128 MDBCUtils.writeLocksOnly = (writeLocksOnly==null) ? Configuration.WRITE_LOCK_ONLY_DEFAULT : Boolean.parseBoolean(writeLocksOnly);
132 initTxDaemonThread();
133 String t = info.getProperty(Configuration.KEY_OWNERSHIP_TIMEOUT);
134 long timeout = (t == null) ? Configuration.DEFAULT_OWNERSHIP_TIMEOUT : Integer.parseInt(t);
135 alreadyApplied = new ConcurrentHashMap<>();
136 ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout);
139 protected String cleanSqlUrl(String url){
142 if (url.length() > 0 && url.charAt(url.length() - 1) == '/') {
143 url= url.substring(0, url.length() - 1);
149 protected void initTxDaemonThread(){
150 txDaemon = new Thread(
151 new MusicTxDigestDaemon(Integer.parseInt(
152 info.getProperty(Configuration.TX_DAEMON_SLEEPTIME_S, Configuration.TX_DAEMON_SLEEPTIME_S_DEFAULT)),
154 txDaemon.setName("TxDaemon");
155 txDaemon.setDaemon(true);
160 * Initialize all the interfaces and datastructures
161 * @throws MDBCServiceException
163 protected void initMusic() throws MDBCServiceException {
164 this.musicInterface = MixinFactory.createMusicInterface(this, musicmixin, mdbcServerName, info);
165 this.mdbcConnections = new HashMap<>();
168 protected void initSqlDatabase() throws MDBCServiceException {
169 if(!this.sqlDBUrl.toLowerCase().startsWith("jdbc:postgresql")) {
171 Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info);
172 StringBuilder sql = new StringBuilder("CREATE DATABASE IF NOT EXISTS ")
175 Statement stmt = sqlConnection.createStatement();
176 stmt.execute(sql.toString());
177 sqlConnection.close();
178 } catch (SQLException e) {
179 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.UNKNOWNERROR,
180 ErrorSeverity.CRITICAL,
181 ErrorTypes.GENERALSERVICEERROR);
182 throw new MDBCServiceException(e.getMessage(), e);
186 // Verify the tables in MUSIC match the tables in the database
187 // and create triggers on any tables that need them
189 MdbcConnection mdbcConn = (MdbcConnection) openConnection("init");
190 mdbcConn.initDatabase();
191 closeConnection("init");
192 } catch (QueryException e) {
193 logger.error("Error syncrhonizing tables");
194 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
199 * Get list of ranges to warmup from configuration file
200 * if no configuration is provided, will return null
203 private Set<Range> initWarmupRanges() {
204 String warmupString = info.getProperty(Configuration.KEY_WARMUPRANGES);
205 if (warmupString==null) {
208 Set<Range> warmupRanges = new HashSet<>();
209 String[] ranges = warmupString.split(",");
210 for (String range: ranges) {
211 warmupRanges.add(new Range(range.trim()));
216 public MusicInterface getMusicInterface() {
217 return this.musicInterface;
220 public List<DatabasePartition> getPartitions() {
221 return new ArrayList<>(connectionRanges.values());
225 * Get a list of ranges that are to be periodically warmed up
227 * If no list is specified, all ranges except eventual consistency ranges are returned
230 public Set<Range> getRangesToWarmup() {
232 Set<Range> returnSet;
234 if(rangesToWarmup!=null) {
235 returnSet = rangesToWarmup;
238 returnSet = getAllRanges();
239 for (Range eventualRange: eventualRanges) {
240 returnSet.remove(eventualRange);
251 * Get a set of all ranges seen in the sql db
254 private Set<Range> getAllRanges() {
255 DBInterface dbi = ((MdbcConnection) getConnection("daemon")).getDBInterface();
256 return dbi.getSQLRangeSet();
260 * Get a list of ranges that are eventually consistent
263 public Set<Range> getEventualRanges() {
265 Set<Range> returnSet;
267 if(eventualRanges!=null){
268 returnSet = new HashSet<>(eventualRanges);
271 returnSet= new HashSet<>();
275 eventualLock.unlock();
280 public void setEventualRanges(Set<Range> eventualRanges) {
283 this.eventualRanges = eventualRanges;
286 eventualLock.unlock();
290 public String getMdbcServerName() {
291 return mdbcServerName;
294 public void setMdbcServerName(String mdbcServerName) {
295 this.mdbcServerName = mdbcServerName;
299 * Close connection and relinquish any locks held for that connection
300 * @param connectionId
302 public void closeConnection(String connectionId){
303 //\TODO check if there is a race condition
304 if(mdbcConnections.containsKey(connectionId)) {
305 transactionInfo.deleteTxProgress(connectionId);
307 Connection conn = mdbcConnections.get(connectionId);
308 if (conn!=null && !conn.isClosed()) {
311 } catch (SQLException e) {
312 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
313 ErrorTypes.GENERALSERVICEERROR);
315 mdbcConnections.remove(connectionId);
318 connectionRanges.remove(connectionId);
323 * Opens a connection into database, setting up all necessary triggers, etc
324 * @param id UUID of a connection
326 public Connection openConnection(String id) {
327 Connection sqlConnection;
328 MdbcConnection newConnection;
329 Utils.registerDefaultDrivers();
330 //Create connection to local SQL DB
333 sqlConnection = DriverManager.getConnection(this.sqlDBUrl+"/"+this.sqlDBName, this.info);
334 } catch (SQLException e) {
335 logger.error("sql connection was not created correctly");
336 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL,
337 ErrorTypes.QUERYERROR);
338 sqlConnection = null;
341 //TODO: later we could try to match it to some more sticky client id
342 DatabasePartition ranges=new DatabasePartition(musicInterface.generateUniqueKey());
343 connectionRanges.put(id,ranges);
345 //Create MDBC connection
347 newConnection = new MdbcConnection(id,this.sqlDBUrl+"/"+this.sqlDBName, sqlConnection, info, this.musicInterface,
348 transactionInfo,ranges, this);
349 } catch (MDBCServiceException e) {
350 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
351 ErrorTypes.QUERYERROR);
352 newConnection = null;
354 logger.info(EELFLoggerDelegate.applicationLogger,"Connection created for connection: "+id);
356 transactionInfo.createNewTransactionTracker(id, sqlConnection);
357 if(newConnection != null) {
358 mdbcConnections.put(id,newConnection);
360 return newConnection;
365 * This function returns the connection to the corresponding transaction
366 * @param id of the transaction, created using
369 public Connection getConnection(String id) {
370 if(mdbcConnections.containsKey(id)) {
371 //\TODO: Verify if this make sense
372 // Intent: reinitialize transaction progress, when it already completed the previous tx for the same connection
373 if(transactionInfo.isComplete(id)) {
374 transactionInfo.reinitializeTxProgress(id);
376 return mdbcConnections.get(id);
379 return openConnection(id);
382 public void initializeSystem() {
383 //\TODO Prefetch data to system using the data ranges as guide
384 throw new UnsupportedOperationException("Function initialize system needs to be implemented id MdbcStateManager");
387 private void relinquish(DatabasePartition partition){
389 musicInterface.relinquish(partition);
390 } catch (MDBCServiceException e) {
391 logger.error("Relinquish failed, would need to forcefully obtain lock later");
396 public void setWarmupRanges(Set<Range> warmupRanges) {
399 this.rangesToWarmup = warmupRanges;
406 public OwnershipAndCheckpoint getOwnAndCheck() {
411 * Close all connections for this server, relinquishing any locks/partitions owned by this server
413 public void releaseAllPartitions() {
414 for(String connection: this.mdbcConnections.keySet()) {
415 closeConnection(connection);