ce727b8bf999c6c35356f1230b6b4720dd7e6bd3
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.plugins.context.locking.curator;
23
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.locks.ReadWriteLock;
26
27 import org.apache.curator.framework.CuratorFramework;
28 import org.apache.curator.framework.CuratorFrameworkFactory;
29 import org.apache.curator.framework.state.ConnectionState;
30 import org.apache.curator.framework.state.ConnectionStateListener;
31 import org.apache.curator.retry.ExponentialBackoffRetry;
32 import org.apache.curator.utils.CloseableUtils;
33 import org.apache.zookeeper.CreateMode;
34 import org.onap.policy.apex.context.ContextException;
35 import org.onap.policy.apex.context.impl.locking.AbstractLockManager;
36 import org.onap.policy.apex.context.parameters.ContextParameterConstants;
37 import org.onap.policy.apex.context.parameters.LockManagerParameters;
38 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
39 import org.onap.policy.common.parameters.ParameterService;
40 import org.slf4j.ext.XLogger;
41 import org.slf4j.ext.XLoggerFactory;
42
43 /**
44  * The Class CuratorLockManager manages the Curator interface towards Zookeeper for administering the Apex Context Album
45  * instance locks..
46  */
47 public class CuratorLockManager extends AbstractLockManager {
48     // Logger for this class
49     private static final XLogger LOGGER = XLoggerFactory.getXLogger(CuratorLockManager.class);
50
51     // The Curator framework used for locking
52     private CuratorFramework curatorFramework;
53
54     // The address of the Zookeeper server
55     private String curatorZookeeperAddress;
56
57     /**
58      * Constructor, set up a lock manager that uses Curator locking.
59      *
60      * @throws ContextException On errors connecting to Curator
61      */
62     public CuratorLockManager() throws ContextException {
63         LOGGER.entry("CuratorLockManager(): setting up the Curator lock manager . . .");
64
65         LOGGER.exit("CuratorLockManager(): Curator lock manager set up");
66     }
67
68     /*
69      * (non-Javadoc)
70      *
71      * @see org.onap.policy.apex.context.impl.locking.AbstractLockManager#init(org.onap.policy.apex. model.
72      * basicmodel.concepts.AxArtifactKey)
73      */
74     @Override
75     public void init(final AxArtifactKey key) throws ContextException {
76         LOGGER.entry("init(" + key + ")");
77
78         super.init(key);
79
80         // Get the lock manager parameters
81         final LockManagerParameters lockParameters = ParameterService.get(ContextParameterConstants.LOCKING_GROUP_NAME);
82
83         if (!(lockParameters instanceof CuratorLockManagerParameters)) {
84             String message = "could not set up Curator locking, "
85                     + "curator lock manager parameters are not set";
86             LOGGER.warn(message);
87             throw new ContextException(message);
88         }
89
90         final CuratorLockManagerParameters curatorLockPars = (CuratorLockManagerParameters)lockParameters;
91
92         // Check if the curator address has been set
93         curatorZookeeperAddress = curatorLockPars.getZookeeperAddress();
94         if (curatorZookeeperAddress == null || curatorZookeeperAddress.trim().length() == 0) {
95             String message = "could not set up Curator locking, "
96                             + "check if the curator Zookeeper address parameter is set correctly";
97             LOGGER.warn(message);
98             throw new ContextException(message);
99         }
100
101         // Set up the curator framework we'll use
102         curatorFramework = CuratorFrameworkFactory.builder().connectString(curatorZookeeperAddress)
103                         .retryPolicy(new ExponentialBackoffRetry(curatorLockPars.getZookeeperConnectSleepTime(),
104                                         curatorLockPars.getZookeeperContextRetries()))
105                         .build();
106
107         // Listen for changes on the Curator connection
108         curatorFramework.getConnectionStateListenable().addListener(new CuratorManagerConnectionStateListener());
109
110         // Start the framework and specify Ephemeral nodes
111         curatorFramework.start();
112
113         // Wait for the connection to be made
114         try {
115             curatorFramework.blockUntilConnected(
116                     curatorLockPars.getZookeeperConnectSleepTime() * curatorLockPars.getZookeeperContextRetries(),
117                     TimeUnit.MILLISECONDS);
118         } catch (final InterruptedException e) {
119             // restore the interrupt status
120             Thread.currentThread().interrupt();
121             String message = "error connecting to Zookeeper server at \"" + curatorZookeeperAddress
122                             + "\", wait for connection timed out";
123             LOGGER.warn(message);
124             throw new ContextException(message);
125         }
126
127         if (!curatorFramework.getZookeeperClient().isConnected()) {
128             String message = "could not connect to Zookeeper server at \"" + curatorZookeeperAddress
129                             + "\", see error log for details";
130             LOGGER.warn(message);
131             throw new ContextException(message);
132         }
133
134         // We'll use Ephemeral nodes for locks on the Zookeeper server
135         curatorFramework.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
136
137         LOGGER.exit("init(" + key + "," + curatorLockPars + ")");
138     }
139
140     /*
141      * (non-Javadoc)
142      *
143      * @see org.onap.policy.apex.core.context.impl.locking.AbstractLockManager#getReentrantReadWriteLock(
144      * java.lang.String)
145      */
146     @Override
147     public ReadWriteLock getReentrantReadWriteLock(final String lockId) throws ContextException {
148         // Check if the framework is active
149         if (curatorFramework != null && curatorFramework.getZookeeperClient().isConnected()) {
150             return new CuratorReentrantReadWriteLock(curatorFramework, "/" + lockId);
151         } else {
152             throw new ContextException("creation of lock using Zookeeper server at \"" + curatorZookeeperAddress
153                             + "\", failed, see error log for details");
154         }
155     }
156
157     /*
158      * (non-Javadoc)
159      *
160      * @see org.onap.policy.apex.core.context.LockManager#shutdown()
161      */
162     @Override
163     public void shutdown() {
164         if (curatorFramework == null) {
165             return;
166         }
167         CloseableUtils.closeQuietly(curatorFramework);
168         curatorFramework = null;
169     }
170
171     /**
172      * This class is a callback class for state changes on the curator to Zookeeper connection.
173      */
174     private class CuratorManagerConnectionStateListener implements ConnectionStateListener {
175
176         /*
177          * (non-Javadoc)
178          *
179          * @see org.apache.curator.framework.state.ConnectionStateListener#stateChanged(org.apache.
180          * curator.framework.CuratorFramework, org.apache.curator.framework.state.ConnectionState)
181          */
182         @Override
183         public void stateChanged(final CuratorFramework incomngCuratorFramework, final ConnectionState newState) {
184             // Is the state changed for this curator framework?
185             if (!incomngCuratorFramework.equals(curatorFramework)) {
186                 return;
187             }
188
189             LOGGER.info("curator state of client \"{}\" connected to \"{}\" changed to {}", curatorFramework,
190                             curatorZookeeperAddress, newState);
191
192             if (newState != ConnectionState.CONNECTED) {
193                 shutdown();
194             }
195         }
196     }
197 }