2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.plugins.context.locking.curator;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.locks.ReadWriteLock;
26 import org.apache.curator.framework.CuratorFramework;
27 import org.apache.curator.framework.CuratorFrameworkFactory;
28 import org.apache.curator.framework.state.ConnectionState;
29 import org.apache.curator.framework.state.ConnectionStateListener;
30 import org.apache.curator.retry.ExponentialBackoffRetry;
31 import org.apache.curator.utils.CloseableUtils;
32 import org.apache.zookeeper.CreateMode;
33 import org.onap.policy.apex.context.ContextException;
34 import org.onap.policy.apex.context.impl.locking.AbstractLockManager;
35 import org.onap.policy.apex.context.parameters.ContextParameterConstants;
36 import org.onap.policy.apex.context.parameters.LockManagerParameters;
37 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
38 import org.onap.policy.common.parameters.ParameterService;
39 import org.slf4j.ext.XLogger;
40 import org.slf4j.ext.XLoggerFactory;
43 * The Class CuratorLockManager manages the Curator interface towards Zookeeper for administering the Apex Context Album
46 public class CuratorLockManager extends AbstractLockManager {
47 // Logger for this class
48 private static final XLogger LOGGER = XLoggerFactory.getXLogger(CuratorLockManager.class);
50 // The Curator framework used for locking
51 private CuratorFramework curatorFramework;
53 // The address of the Zookeeper server
54 private String curatorZookeeperAddress;
57 * Constructor, set up a lock manager that uses Curator locking.
59 * @throws ContextException On errors connecting to Curator
61 public CuratorLockManager() throws ContextException {
62 LOGGER.entry("CuratorLockManager(): setting up the Curator lock manager . . .");
64 LOGGER.exit("CuratorLockManager(): Curator lock manager set up");
71 public void init(final AxArtifactKey key) throws ContextException {
72 LOGGER.entry("init(" + key + ")");
76 // Get the lock manager parameters
77 final LockManagerParameters lockParameters = ParameterService.get(ContextParameterConstants.LOCKING_GROUP_NAME);
79 if (!(lockParameters instanceof CuratorLockManagerParameters)) {
80 String message = "could not set up Curator locking, "
81 + "curator lock manager parameters are not set";
83 throw new ContextException(message);
86 final CuratorLockManagerParameters curatorLockPars = (CuratorLockManagerParameters) lockParameters;
88 // Check if the curator address has been set
89 curatorZookeeperAddress = curatorLockPars.getZookeeperAddress();
90 if (curatorZookeeperAddress == null || curatorZookeeperAddress.trim().length() == 0) {
91 String message = "could not set up Curator locking, "
92 + "check if the curator Zookeeper address parameter is set correctly";
94 throw new ContextException(message);
97 // Set up the curator framework we'll use
98 curatorFramework = CuratorFrameworkFactory.builder().connectString(curatorZookeeperAddress)
99 .retryPolicy(new ExponentialBackoffRetry(curatorLockPars.getZookeeperConnectSleepTime(),
100 curatorLockPars.getZookeeperContextRetries()))
103 // Listen for changes on the Curator connection
104 curatorFramework.getConnectionStateListenable().addListener(new CuratorManagerConnectionStateListener());
106 // Start the framework and specify Ephemeral nodes
107 curatorFramework.start();
109 // Wait for the connection to be made
111 curatorFramework.blockUntilConnected(
112 curatorLockPars.getZookeeperConnectSleepTime() * curatorLockPars.getZookeeperContextRetries(),
113 TimeUnit.MILLISECONDS);
114 } catch (final InterruptedException e) {
115 // restore the interrupt status
116 Thread.currentThread().interrupt();
117 String message = "error connecting to Zookeeper server at \"" + curatorZookeeperAddress
118 + "\", wait for connection timed out";
119 LOGGER.warn(message);
120 throw new ContextException(message);
123 if (!curatorFramework.getZookeeperClient().isConnected()) {
124 String message = "could not connect to Zookeeper server at \"" + curatorZookeeperAddress
125 + "\", see error log for details";
126 LOGGER.warn(message);
127 throw new ContextException(message);
130 // We'll use Ephemeral nodes for locks on the Zookeeper server
131 curatorFramework.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
133 LOGGER.exit("init(" + key + "," + curatorLockPars + ")");
140 public ReadWriteLock getReentrantReadWriteLock(final String lockId) throws ContextException {
141 // Check if the framework is active
142 if (curatorFramework != null && curatorFramework.getZookeeperClient().isConnected()) {
143 return new CuratorReentrantReadWriteLock(curatorFramework, "/" + lockId);
145 throw new ContextException("creation of lock using Zookeeper server at \"" + curatorZookeeperAddress
146 + "\", failed, see error log for details");
154 public void shutdown() {
155 if (curatorFramework == null) {
158 CloseableUtils.closeQuietly(curatorFramework);
159 curatorFramework = null;
163 * This class is a callback class for state changes on the curator to Zookeeper connection.
165 private class CuratorManagerConnectionStateListener implements ConnectionStateListener {
171 public void stateChanged(final CuratorFramework incomngCuratorFramework, final ConnectionState newState) {
172 // Is the state changed for this curator framework?
173 if (!incomngCuratorFramework.equals(curatorFramework)) {
177 LOGGER.info("curator state of client \"{}\" connected to \"{}\" changed to {}", curatorFramework,
178 curatorZookeeperAddress, newState);
180 if (newState != ConnectionState.CONNECTED) {