4ab0ca5e8e9faac521a2d86757c6ab5453b6387d
[policy/apex-pdp.git] / plugins / plugins-context / plugins-context-locking / plugins-context-locking-curator / src / main / java / org / onap / policy / apex / plugins / context / locking / curator / CuratorLockManager.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.plugins.context.locking.curator;
23
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.locks.ReadWriteLock;
26
27 import org.apache.curator.framework.CuratorFramework;
28 import org.apache.curator.framework.CuratorFrameworkFactory;
29 import org.apache.curator.framework.state.ConnectionState;
30 import org.apache.curator.framework.state.ConnectionStateListener;
31 import org.apache.curator.retry.ExponentialBackoffRetry;
32 import org.apache.curator.utils.CloseableUtils;
33 import org.apache.zookeeper.CreateMode;
34 import org.onap.policy.apex.context.ContextException;
35 import org.onap.policy.apex.context.impl.locking.AbstractLockManager;
36 import org.onap.policy.apex.context.parameters.ContextParameterConstants;
37 import org.onap.policy.apex.context.parameters.LockManagerParameters;
38 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
39 import org.onap.policy.common.parameters.ParameterService;
40 import org.slf4j.ext.XLogger;
41 import org.slf4j.ext.XLoggerFactory;
42
43 /**
44  * The Class CuratorLockManager manages the Curator interface towards Zookeeper for administering the Apex Context Album
45  * instance locks..
46  */
47 public class CuratorLockManager extends AbstractLockManager {
48     // Logger for this class
49     private static final XLogger LOGGER = XLoggerFactory.getXLogger(CuratorLockManager.class);
50
51     // The Curator framework used for locking
52     private CuratorFramework curatorFramework;
53
54     // The address of the Zookeeper server
55     private String curatorZookeeperAddress;
56
57     /**
58      * Constructor, set up a lock manager that uses Curator locking.
59      *
60      * @throws ContextException On errors connecting to Curator
61      */
62     public CuratorLockManager() throws ContextException {
63         LOGGER.entry("CuratorLockManager(): setting up the Curator lock manager . . .");
64
65         LOGGER.exit("CuratorLockManager(): Curator lock manager set up");
66     }
67
68     /**
69      * {@inheritDoc}.
70      */
71     @Override
72     public void init(final AxArtifactKey key) throws ContextException {
73         LOGGER.entry("init(" + key + ")");
74
75         super.init(key);
76
77         // Get the lock manager parameters
78         final LockManagerParameters lockParameters = ParameterService.get(ContextParameterConstants.LOCKING_GROUP_NAME);
79
80         if (!(lockParameters instanceof CuratorLockManagerParameters)) {
81             String message = "could not set up Curator locking, "
82                     + "curator lock manager parameters are not set";
83             LOGGER.warn(message);
84             throw new ContextException(message);
85         }
86
87         final CuratorLockManagerParameters curatorLockPars = (CuratorLockManagerParameters)lockParameters;
88
89         // Check if the curator address has been set
90         curatorZookeeperAddress = curatorLockPars.getZookeeperAddress();
91         if (curatorZookeeperAddress == null || curatorZookeeperAddress.trim().length() == 0) {
92             String message = "could not set up Curator locking, "
93                             + "check if the curator Zookeeper address parameter is set correctly";
94             LOGGER.warn(message);
95             throw new ContextException(message);
96         }
97
98         // Set up the curator framework we'll use
99         curatorFramework = CuratorFrameworkFactory.builder().connectString(curatorZookeeperAddress)
100                         .retryPolicy(new ExponentialBackoffRetry(curatorLockPars.getZookeeperConnectSleepTime(),
101                                         curatorLockPars.getZookeeperContextRetries()))
102                         .build();
103
104         // Listen for changes on the Curator connection
105         curatorFramework.getConnectionStateListenable().addListener(new CuratorManagerConnectionStateListener());
106
107         // Start the framework and specify Ephemeral nodes
108         curatorFramework.start();
109
110         // Wait for the connection to be made
111         try {
112             curatorFramework.blockUntilConnected(
113                     curatorLockPars.getZookeeperConnectSleepTime() * curatorLockPars.getZookeeperContextRetries(),
114                     TimeUnit.MILLISECONDS);
115         } catch (final InterruptedException e) {
116             // restore the interrupt status
117             Thread.currentThread().interrupt();
118             String message = "error connecting to Zookeeper server at \"" + curatorZookeeperAddress
119                             + "\", wait for connection timed out";
120             LOGGER.warn(message);
121             throw new ContextException(message);
122         }
123
124         if (!curatorFramework.getZookeeperClient().isConnected()) {
125             String message = "could not connect to Zookeeper server at \"" + curatorZookeeperAddress
126                             + "\", see error log for details";
127             LOGGER.warn(message);
128             throw new ContextException(message);
129         }
130
131         // We'll use Ephemeral nodes for locks on the Zookeeper server
132         curatorFramework.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
133
134         LOGGER.exit("init(" + key + "," + curatorLockPars + ")");
135     }
136
137     /**
138      * {@inheritDoc}.
139      */
140     @Override
141     public ReadWriteLock getReentrantReadWriteLock(final String lockId) throws ContextException {
142         // Check if the framework is active
143         if (curatorFramework != null && curatorFramework.getZookeeperClient().isConnected()) {
144             return new CuratorReentrantReadWriteLock(curatorFramework, "/" + lockId);
145         } else {
146             throw new ContextException("creation of lock using Zookeeper server at \"" + curatorZookeeperAddress
147                             + "\", failed, see error log for details");
148         }
149     }
150
151     /**
152      * {@inheritDoc}.
153      */
154     @Override
155     public void shutdown() {
156         if (curatorFramework == null) {
157             return;
158         }
159         CloseableUtils.closeQuietly(curatorFramework);
160         curatorFramework = null;
161     }
162
163     /**
164      * This class is a callback class for state changes on the curator to Zookeeper connection.
165      */
166     private class CuratorManagerConnectionStateListener implements ConnectionStateListener {
167
168         /**
169          * {@inheritDoc}.
170          */
171         @Override
172         public void stateChanged(final CuratorFramework incomngCuratorFramework, final ConnectionState newState) {
173             // Is the state changed for this curator framework?
174             if (!incomngCuratorFramework.equals(curatorFramework)) {
175                 return;
176             }
177
178             LOGGER.info("curator state of client \"{}\" connected to \"{}\" changed to {}", curatorFramework,
179                             curatorZookeeperAddress, newState);
180
181             if (newState != ConnectionState.CONNECTED) {
182                 shutdown();
183             }
184         }
185     }
186 }