477a010fa81ba245a2a46079a5a75a33112763b7
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.plugins.context.locking.curator;
22
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.locks.ReadWriteLock;
25
26 import org.apache.curator.framework.CuratorFramework;
27 import org.apache.curator.framework.CuratorFrameworkFactory;
28 import org.apache.curator.framework.state.ConnectionState;
29 import org.apache.curator.framework.state.ConnectionStateListener;
30 import org.apache.curator.retry.ExponentialBackoffRetry;
31 import org.apache.curator.utils.CloseableUtils;
32 import org.apache.zookeeper.CreateMode;
33 import org.onap.policy.apex.context.ContextException;
34 import org.onap.policy.apex.context.impl.locking.AbstractLockManager;
35 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
36 import org.onap.policy.apex.model.basicmodel.service.ParameterService;
37 import org.slf4j.ext.XLogger;
38 import org.slf4j.ext.XLoggerFactory;
39
40 /**
41  * The Class CuratorLockManager manages the Curator interface towards Zookeeper for administering
42  * the Apex Context Album instance locks..
43  */
44 public class CuratorLockManager extends AbstractLockManager {
45     // Logger for this class
46     private static final XLogger LOGGER = XLoggerFactory.getXLogger(CuratorLockManager.class);
47
48     // The Curator framework used for locking
49     private CuratorFramework curatorFramework;
50
51     // The address of the Zookeeper server
52     private String curatorZookeeperAddress;
53
54     /**
55      * Constructor, set up a lock manager that uses Curator locking.
56      *
57      * @throws ContextException On errors connecting to Curator
58      */
59     public CuratorLockManager() throws ContextException {
60         LOGGER.entry("CuratorLockManager(): setting up the Curator lock manager . . .");
61
62         LOGGER.exit("CuratorLockManager(): Curator lock manager set up");
63     }
64
65     /*
66      * (non-Javadoc)
67      *
68      * @see org.onap.policy.apex.context.impl.locking.AbstractLockManager#init(org.onap.policy.apex.
69      * model. basicmodel.concepts.AxArtifactKey)
70      */
71     @Override
72     public void init(final AxArtifactKey key) throws ContextException {
73         LOGGER.entry("init(" + key + ")");
74
75         super.init(key);
76
77         // Get the lock manager parameters
78         final CuratorLockManagerParameters lockParameters =
79                 ParameterService.getParameters(CuratorLockManagerParameters.class);
80
81         // Check if the curator address has been set
82         curatorZookeeperAddress = lockParameters.getZookeeperAddress();
83         if (curatorZookeeperAddress == null || curatorZookeeperAddress.trim().length() == 0) {
84             LOGGER.warn(
85                     "could not set up Curator locking, check if the curator Zookeeper address parameter is set correctly");
86             throw new ContextException(
87                     "could not set up Curator locking, check if the curator Zookeeper address parameter is set correctly");
88         }
89
90         // Set up the curator framework we'll use
91         curatorFramework = CuratorFrameworkFactory.builder().connectString(curatorZookeeperAddress)
92                 .retryPolicy(new ExponentialBackoffRetry(lockParameters.getZookeeperConnectSleepTime(),
93                         lockParameters.getZookeeperContextRetries()))
94                 .build();
95
96         // Listen for changes on the Curator connection
97         curatorFramework.getConnectionStateListenable().addListener(new CuratorManagerConnectionStateListener());
98
99         // Start the framework and specify Ephemeral nodes
100         curatorFramework.start();
101
102         // Wait for the connection to be made
103         try {
104             curatorFramework.blockUntilConnected(
105                     lockParameters.getZookeeperConnectSleepTime() * lockParameters.getZookeeperContextRetries(),
106                     TimeUnit.MILLISECONDS);
107         } catch (final InterruptedException e) {
108             // restore the interrupt status
109             Thread.currentThread().interrupt();
110             LOGGER.warn("could not connect to Zookeeper server at \"" + curatorZookeeperAddress
111                     + "\", wait for connection timed out");
112             throw new ContextException("could not connect to Zookeeper server at \"" + curatorZookeeperAddress
113                     + "\", wait for connection timed out");
114         }
115
116         if (!curatorFramework.getZookeeperClient().isConnected()) {
117             LOGGER.warn("could not connect to Zookeeper server at \"" + curatorZookeeperAddress
118                     + "\", see error log for details");
119             throw new ContextException("could not connect to Zookeeper server at \"" + curatorZookeeperAddress
120                     + "\", see error log for details");
121         }
122
123         // We'll use Ephemeral nodes for locks on the Zookeeper server
124         curatorFramework.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
125
126         LOGGER.exit("init(" + key + "," + lockParameters + ")");
127     }
128
129     /*
130      * (non-Javadoc)
131      *
132      * @see
133      * org.onap.policy.apex.core.context.impl.locking.AbstractLockManager#getReentrantReadWriteLock(
134      * java.lang.String)
135      */
136     @Override
137     public ReadWriteLock getReentrantReadWriteLock(final String lockId) throws ContextException {
138         // Check if the framework is active
139         if (curatorFramework != null && curatorFramework.getZookeeperClient().isConnected()) {
140             return new CuratorReentrantReadWriteLock(curatorFramework, "/" + lockId);
141         } else {
142             throw new ContextException("creation of lock using Zookeeper server at \"" + curatorZookeeperAddress
143                     + "\", failed, see error log for details");
144         }
145     }
146
147     /*
148      * (non-Javadoc)
149      *
150      * @see org.onap.policy.apex.core.context.LockManager#shutdown()
151      */
152     @Override
153     public void shutdown() {
154         if (curatorFramework == null) {
155             return;
156         }
157         CloseableUtils.closeQuietly(curatorFramework);
158         curatorFramework = null;
159     }
160
161     /**
162      * This class is a callback class for state changes on the curator to Zookeeper connection.
163      */
164     private class CuratorManagerConnectionStateListener implements ConnectionStateListener {
165
166         /*
167          * (non-Javadoc)
168          *
169          * @see org.apache.curator.framework.state.ConnectionStateListener#stateChanged(org.apache.
170          * curator.framework.CuratorFramework, org.apache.curator.framework.state.ConnectionState)
171          */
172         @Override
173         public void stateChanged(final CuratorFramework incomngCuratorFramework, final ConnectionState newState) {
174             // Is the state changed for this curator framework?
175             if (!incomngCuratorFramework.equals(curatorFramework)) {
176                 return;
177             }
178
179             LOGGER.info("curator state of client \"" + curatorFramework + "\" connected to \"" + curatorZookeeperAddress
180                     + "\" changed to " + newState);
181
182             if (newState != ConnectionState.CONNECTED) {
183                 shutdown();
184             }
185         }
186     }
187 }