2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.plugins.context.locking.curator;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.locks.ReadWriteLock;
27 import org.apache.curator.framework.CuratorFramework;
28 import org.apache.curator.framework.CuratorFrameworkFactory;
29 import org.apache.curator.framework.state.ConnectionState;
30 import org.apache.curator.framework.state.ConnectionStateListener;
31 import org.apache.curator.retry.ExponentialBackoffRetry;
32 import org.apache.curator.utils.CloseableUtils;
33 import org.apache.zookeeper.CreateMode;
34 import org.onap.policy.apex.context.ContextException;
35 import org.onap.policy.apex.context.impl.locking.AbstractLockManager;
36 import org.onap.policy.apex.context.parameters.ContextParameterConstants;
37 import org.onap.policy.apex.context.parameters.LockManagerParameters;
38 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
39 import org.onap.policy.common.parameters.ParameterService;
40 import org.slf4j.ext.XLogger;
41 import org.slf4j.ext.XLoggerFactory;
44 * The Class CuratorLockManager manages the Curator interface towards Zookeeper for administering the Apex Context Album
47 public class CuratorLockManager extends AbstractLockManager {
48 // Logger for this class
49 private static final XLogger LOGGER = XLoggerFactory.getXLogger(CuratorLockManager.class);
51 // The Curator framework used for locking
52 private CuratorFramework curatorFramework;
54 // The address of the Zookeeper server
55 private String curatorZookeeperAddress;
58 * Constructor, set up a lock manager that uses Curator locking.
60 * @throws ContextException On errors connecting to Curator
62 public CuratorLockManager() throws ContextException {
63 LOGGER.entry("CuratorLockManager(): setting up the Curator lock manager . . .");
65 LOGGER.exit("CuratorLockManager(): Curator lock manager set up");
72 public void init(final AxArtifactKey key) throws ContextException {
73 LOGGER.entry("init(" + key + ")");
77 // Get the lock manager parameters
78 final LockManagerParameters lockParameters = ParameterService.get(ContextParameterConstants.LOCKING_GROUP_NAME);
80 if (!(lockParameters instanceof CuratorLockManagerParameters)) {
81 String message = "could not set up Curator locking, "
82 + "curator lock manager parameters are not set";
84 throw new ContextException(message);
87 final CuratorLockManagerParameters curatorLockPars = (CuratorLockManagerParameters)lockParameters;
89 // Check if the curator address has been set
90 curatorZookeeperAddress = curatorLockPars.getZookeeperAddress();
91 if (curatorZookeeperAddress == null || curatorZookeeperAddress.trim().length() == 0) {
92 String message = "could not set up Curator locking, "
93 + "check if the curator Zookeeper address parameter is set correctly";
95 throw new ContextException(message);
98 // Set up the curator framework we'll use
99 curatorFramework = CuratorFrameworkFactory.builder().connectString(curatorZookeeperAddress)
100 .retryPolicy(new ExponentialBackoffRetry(curatorLockPars.getZookeeperConnectSleepTime(),
101 curatorLockPars.getZookeeperContextRetries()))
104 // Listen for changes on the Curator connection
105 curatorFramework.getConnectionStateListenable().addListener(new CuratorManagerConnectionStateListener());
107 // Start the framework and specify Ephemeral nodes
108 curatorFramework.start();
110 // Wait for the connection to be made
112 curatorFramework.blockUntilConnected(
113 curatorLockPars.getZookeeperConnectSleepTime() * curatorLockPars.getZookeeperContextRetries(),
114 TimeUnit.MILLISECONDS);
115 } catch (final InterruptedException e) {
116 // restore the interrupt status
117 Thread.currentThread().interrupt();
118 String message = "error connecting to Zookeeper server at \"" + curatorZookeeperAddress
119 + "\", wait for connection timed out";
120 LOGGER.warn(message);
121 throw new ContextException(message);
124 if (!curatorFramework.getZookeeperClient().isConnected()) {
125 String message = "could not connect to Zookeeper server at \"" + curatorZookeeperAddress
126 + "\", see error log for details";
127 LOGGER.warn(message);
128 throw new ContextException(message);
131 // We'll use Ephemeral nodes for locks on the Zookeeper server
132 curatorFramework.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
134 LOGGER.exit("init(" + key + "," + curatorLockPars + ")");
141 public ReadWriteLock getReentrantReadWriteLock(final String lockId) throws ContextException {
142 // Check if the framework is active
143 if (curatorFramework != null && curatorFramework.getZookeeperClient().isConnected()) {
144 return new CuratorReentrantReadWriteLock(curatorFramework, "/" + lockId);
146 throw new ContextException("creation of lock using Zookeeper server at \"" + curatorZookeeperAddress
147 + "\", failed, see error log for details");
155 public void shutdown() {
156 if (curatorFramework == null) {
159 CloseableUtils.closeQuietly(curatorFramework);
160 curatorFramework = null;
164 * This class is a callback class for state changes on the curator to Zookeeper connection.
166 private class CuratorManagerConnectionStateListener implements ConnectionStateListener {
172 public void stateChanged(final CuratorFramework incomngCuratorFramework, final ConnectionState newState) {
173 // Is the state changed for this curator framework?
174 if (!incomngCuratorFramework.equals(curatorFramework)) {
178 LOGGER.info("curator state of client \"{}\" connected to \"{}\" changed to {}", curatorFramework,
179 curatorZookeeperAddress, newState);
181 if (newState != ConnectionState.CONNECTED) {