1 package com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool;
3 import org.apache.cassandra.auth.IAuthenticator;
4 import org.apache.cassandra.thrift.*;
5 import org.apache.commons.lang.StringUtils;
6 import org.apache.commons.pool.KeyedPoolableObjectFactory;
7 import org.apache.thrift.protocol.TBinaryProtocol;
8 import org.apache.thrift.transport.*;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
13 import java.util.concurrent.atomic.AtomicReference;
16 * A factory compatible with Apache commons-pool for Cassandra Thrift API
19 * @author Dan LaRocque <dalaro@hopcount.org>
21 public class CTConnectionFactory implements KeyedPoolableObjectFactory<String, CTConnection> {
23 private static final Logger log = LoggerFactory.getLogger(CTConnectionFactory.class);
24 private static final long SCHEMA_WAIT_MAX = 5000L;
25 private static final long SCHEMA_WAIT_INCREMENT = 25L;
27 private final AtomicReference<Config> cfgRef;
29 private CTConnectionFactory(Config config) {
30 this.cfgRef = new AtomicReference<Config>(config);
34 public void activateObject(String key, CTConnection c) throws Exception {
35 // Do nothing, as in passivateObject
39 public void destroyObject(String key, CTConnection c) throws Exception {
40 TTransport t = c.getTransport();
44 log.trace("Closed transport {}", t);
46 log.trace("Not closing transport {} (already closed)", t);
51 public CTConnection makeObject(String key) throws Exception {
52 CTConnection conn = makeRawConnection();
53 Cassandra.Client client = conn.getClient();
54 client.set_keyspace(key);
60 * Create a Cassandra-Thrift connection, but do not attempt to
61 * set a keyspace on the connection.
63 * @return A CTConnection ready to talk to a Cassandra cluster
64 * @throws TTransportException on any Thrift transport failure
66 public CTConnection makeRawConnection() throws TTransportException {
67 final Config cfg = cfgRef.get();
69 String hostname = cfg.getRandomHost();
71 log.debug("Creating TSocket({}, {}, {}, {}, {})", hostname, cfg.port, cfg.username, cfg.password, cfg.timeoutMS);
74 if (null != cfg.sslTruststoreLocation && !cfg.sslTruststoreLocation.isEmpty()) {
75 TSSLTransportFactory.TSSLTransportParameters params = new TSSLTransportFactory.TSSLTransportParameters() {{
76 setTrustStore(cfg.sslTruststoreLocation, cfg.sslTruststorePassword);
78 socket = TSSLTransportFactory.getClientSocket(hostname, cfg.port, cfg.timeoutMS, params);
80 socket = new TSocket(hostname, cfg.port, cfg.timeoutMS);
83 TTransport transport = new TFramedTransport(socket, cfg.frameSize);
84 log.trace("Created transport {}", transport);
85 TBinaryProtocol protocol = new TBinaryProtocol(transport);
86 Cassandra.Client client = new Cassandra.Client(protocol);
87 if (!transport.isOpen()) {
91 if (cfg.username != null) {
92 Map<String, String> credentials = new HashMap<String, String>() {{
93 put(IAuthenticator.USERNAME_KEY, cfg.username);
94 put(IAuthenticator.PASSWORD_KEY, cfg.password);
98 client.login(new AuthenticationRequest(credentials));
99 } catch (Exception e) { // TTransportException will propagate authentication/authorization failure
100 throw new TTransportException(e);
103 return new CTConnection(transport, client, cfg);
107 public void passivateObject(String key, CTConnection o) throws Exception {
108 // Do nothing, as in activateObject
112 public boolean validateObject(String key, CTConnection c) {
113 Config curCfg = cfgRef.get();
115 boolean isSameConfig = c.getConfig().equals(curCfg);
116 if (log.isDebugEnabled()) {
118 log.trace("Validated {} by configuration {}", c, curCfg);
120 log.trace("Rejected {}; current config is {}; rejected connection config is {}",
121 c, curCfg, c.getConfig());
125 return isSameConfig && c.isOpen();
128 public static class Config {
130 private final String[] hostnames;
131 private final int port;
132 private final String username;
133 private final String password;
134 private final Random random;
136 private int timeoutMS;
137 private int frameSize;
139 private String sslTruststoreLocation;
140 private String sslTruststorePassword;
142 private boolean isBuilt;
144 public Config(String[] hostnames, int port, String username, String password) {
145 this.hostnames = hostnames;
147 this.username = username;
148 this.password = password;
149 this.random = new Random();
152 // TODO: we don't really need getters/setters here as all of the fields are final and immutable
154 public String getHostname() {
158 public int getPort() {
162 public String getRandomHost() {
163 return hostnames.length == 1 ? hostnames[0] : hostnames[random.nextInt(hostnames.length)];
166 public Config setTimeoutMS(int timeoutMS) {
167 checkIfAlreadyBuilt();
168 this.timeoutMS = timeoutMS;
172 public Config setFrameSize(int frameSize) {
173 checkIfAlreadyBuilt();
174 this.frameSize = frameSize;
178 public Config setSSLTruststoreLocation(String location) {
179 checkIfAlreadyBuilt();
180 this.sslTruststoreLocation = location;
184 public Config setSSLTruststorePassword(String password) {
185 checkIfAlreadyBuilt();
186 this.sslTruststorePassword = password;
190 public CTConnectionFactory build() {
192 return new CTConnectionFactory(this);
196 public void checkIfAlreadyBuilt() {
198 throw new IllegalStateException("Can't accept modifications when used with built factory.");
202 public String toString() {
203 return "Config[hostnames=" + StringUtils.join(hostnames, ',') + ", port=" + port
204 + ", timeoutMS=" + timeoutMS + ", frameSize=" + frameSize