2 * ============LICENSE_START========================================== org.onap.music
3 * =================================================================== Copyright (c) 2017 AT&T
4 * Intellectual Property ===================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6 * in compliance with the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
15 * ============LICENSE_END=============================================
16 * ====================================================================
18 package org.onap.music.lockingservice;
21 import java.util.List;
22 import java.util.SortedSet;
23 import java.util.TreeSet;
24 import org.apache.zookeeper.CreateMode;
25 import org.apache.zookeeper.KeeperException;
26 import org.apache.zookeeper.KeeperException.NoNodeException;
27 import org.apache.zookeeper.ZooDefs;
28 import org.apache.zookeeper.ZooKeeper;
29 import org.apache.zookeeper.data.ACL;
30 import org.apache.zookeeper.data.Stat;
31 import org.onap.music.datastore.PreparedQueryObject;
32 import org.onap.music.eelf.logging.EELFLoggerDelegate;
33 import org.onap.music.eelf.logging.format.AppMessages;
34 import org.onap.music.eelf.logging.format.ErrorSeverity;
35 import org.onap.music.eelf.logging.format.ErrorTypes;
36 import org.onap.music.main.MusicCore;
37 import org.onap.music.main.MusicUtil;
39 import com.datastax.driver.core.DataType;
42 * A <a href="package.html">protocol to implement an exclusive write lock or to elect a leader</a>.
44 * You invoke {@link #lock()} to start the process of grabbing the lock; you may get the lock then
45 * or it may be some time later.
47 * You can register a listener so that you are invoked when you get the lock; otherwise you can ask
48 * if you have the lock by calling {@link #isOwner()}
51 public class ZkStatelessLockService extends ProtocolSupport {
52 public ZkStatelessLockService(ZooKeeper zk) {
56 private static EELFLoggerDelegate logger =
57 EELFLoggerDelegate.getLogger(ZkStatelessLockService.class);
59 protected void createLock(final String path, final byte[] data) {
60 final List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
62 retryOperation(new ZooKeeperOperation() {
63 public boolean execute() throws KeeperException, InterruptedException {
64 zookeeper.create(path, data, acl, CreateMode.PERSISTENT);
68 }catch (InterruptedException e) {
69 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
70 }catch (KeeperException e) {
71 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
78 }catch (InterruptedException e) {
79 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
83 public void setNodeData(final String lockName, final byte[] data) {
85 retryOperation(new ZooKeeperOperation() {
86 public boolean execute() throws KeeperException, InterruptedException {
87 zookeeper.getSessionId();
88 zookeeper.setData("/" + lockName, data, -1);
92 }catch (InterruptedException e) {
93 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
94 }catch (KeeperException e) {
95 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
100 public byte[] getNodeData(final String lockName) {
102 if (zookeeper.exists("/" + lockName, null) != null)
103 return zookeeper.getData("/" + lockName, false, null);
107 }catch (InterruptedException e) {
108 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
109 }catch (KeeperException e) {
110 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
115 public boolean checkIfLockExists(String lockName) {
116 boolean result = false;
118 Stat stat = zookeeper.exists(lockName, false);
122 }catch (InterruptedException e) {
123 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
124 }catch (KeeperException e) {
125 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
130 public void createNode(String nodeName) {
131 ensurePathExists(nodeName);
134 public String createLockId(String dir) {
135 ensurePathExists(dir);
136 LockZooKeeperOperation zop = new LockZooKeeperOperation(dir);
139 }catch (InterruptedException e) {
140 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
141 }catch (KeeperException e) {
142 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
148 * Attempts to acquire the exclusive write lock returning whether or not it was acquired. Note
149 * that the exclusive lock may be acquired some time later after this method has been invoked
150 * due to the current lock owner going away.
152 public synchronized boolean lock(String dir, String lockId)
153 throws KeeperException, InterruptedException {
157 LockZooKeeperOperation zop = new LockZooKeeperOperation(dir, lockId);
158 return (Boolean) retryOperation(zop);
162 * Removes the lock or associated znode if you no longer require the lock. this also removes
163 * your request in the queue for locking in case you do not already hold the lock.
165 * @throws RuntimeException throws a runtime exception if it cannot connect to zookeeper.
166 * @throws NoNodeException
168 public synchronized void unlock(String lockId) throws RuntimeException, KeeperException.NoNodeException {
169 final String id = lockId;
170 if (!isClosed() && id != null) {
172 ZooKeeperOperation zopdel = new ZooKeeperOperation() {
173 public boolean execute() throws KeeperException, InterruptedException {
174 zookeeper.delete(id, -1);
179 } catch (InterruptedException e) {
180 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
181 // set that we have been interrupted.
182 Thread.currentThread().interrupt();
183 } catch (KeeperException.NoNodeException e) {
185 throw new KeeperException.NoNodeException("Lock doesn't exists. Release lock operation failed.");
186 } catch (KeeperException e) {
187 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
188 throw (RuntimeException) new RuntimeException(e.getMessage()).initCause(e);
193 public synchronized String currentLockHolder(String mainLock) {
194 final String id = mainLock;
195 if (!isClosed() && id != null) {
198 names = zookeeper.getChildren(id, false);
201 SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>();
202 for (String name : names) {
203 sortedNames.add(new ZNodeName(id + "/" + name));
205 return sortedNames.first().getName();
206 } catch (InterruptedException e) {
207 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
208 // set that we have been interrupted.
209 Thread.currentThread().interrupt();
210 } catch (KeeperException.NoNodeException e) {
212 } catch (KeeperException e) {
213 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
214 throw (RuntimeException) new RuntimeException(e.getMessage()).initCause(e);
217 return "No lock holder!";
220 public synchronized void deleteLock(String mainLock) {
221 final String id = mainLock;
222 if (!isClosed() && id != null) {
224 ZooKeeperOperation zopdel = new ZooKeeperOperation() {
225 public boolean execute() throws KeeperException, InterruptedException {
226 List<String> names = zookeeper.getChildren(id, false);
227 for (String name : names) {
228 zookeeper.delete(id + "/" + name, -1);
230 zookeeper.delete(id, -1);
235 } catch (InterruptedException e) {
236 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
237 // set that we have been interrupted.
238 Thread.currentThread().interrupt();
239 } catch (KeeperException.NoNodeException e) {
240 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
242 } catch (KeeperException e) {
243 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
244 throw (RuntimeException) new RuntimeException(e.getMessage()).initCause(e);
251 * a zoookeeper operation that is mainly responsible for all the magic required for locking.
253 private class LockZooKeeperOperation implements ZooKeeperOperation {
256 * find if we have been created earlier if not create our node
258 * @param prefix the prefix node
259 * @param zookeeper the zookeeper client
260 * @param dir the dir parent
261 * @throws KeeperException
262 * @throws InterruptedException
265 private String id = null;
267 public String getId() {
271 public LockZooKeeperOperation(String dir) {
275 public LockZooKeeperOperation(String dir, String id) {
281 * the command that is run and retried for actually obtaining the lock
283 * @return if the command was successful or not
285 public boolean execute() throws KeeperException, InterruptedException {
288 String prefix = "x-";
289 byte[] data = {0x12, 0x34};
290 id = zookeeper.create(dir + "/" + prefix, data, getAcl(),
291 CreateMode.PERSISTENT_SEQUENTIAL);
293 if (logger.isDebugEnabled()) {
294 logger.debug(EELFLoggerDelegate.debugLogger, "Created id: " + id);
299 stat = zookeeper.exists(id, false);
300 } catch (KeeperException | InterruptedException e1) {
301 e1.printStackTrace();
303 Long ctime = stat.getCtime();
304 System.out.println("Created id ....####"+ctime+"##.......id...:"+id);
305 MusicUtil.zkNodeMap.put(id, ctime);
306 PreparedQueryObject pQuery = new PreparedQueryObject();
307 pQuery.appendQueryString(
308 "INSERT INTO admin.locks(lock_id, ctime) VALUES (?,?)");
310 pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), id));
311 pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), ctime));
312 MusicCore.eventualPut(pQuery);
313 } catch (Exception e) {
320 List<String> names = zookeeper.getChildren(dir, false);
321 if (names.isEmpty()) {
322 logger.info(EELFLoggerDelegate.applicationLogger, "No children in: " + dir
323 + " when we've just " + "created one! Lets recreate it...");
324 // lets force the recreation of the id
326 return Boolean.FALSE;
329 // lets sort them explicitly (though they do seem to come back in order
331 ZNodeName idName = new ZNodeName(id);
332 SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>();
333 for (String name : names) {
334 sortedNames.add(new ZNodeName(dir + "/" + name));
336 if (!sortedNames.contains(idName))
337 return Boolean.FALSE;
339 SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
340 if (!lessThanMe.isEmpty()) {
341 ZNodeName lastChildName = lessThanMe.last();
342 String lastChildId = lastChildName.getName();
343 if (logger.isDebugEnabled()) {
344 logger.debug(EELFLoggerDelegate.debugLogger, "watching less than me node: " + lastChildId);
346 Stat stat = zookeeper.exists(lastChildId, false);
348 return Boolean.FALSE;
350 logger.info(EELFLoggerDelegate.applicationLogger,
351 "Could not find the" + " stats for less than me: "
352 + lastChildName.getName());
358 } while (id == null);
359 return Boolean.FALSE;