first commit for new repo
[sdc/sdc-titan-cassandra.git] / src / main / java / com / thinkaurelius / titan / diskstorage / cassandra / thrift / thriftpool / CTConnectionFactory.java
1 package com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool;
2
3 import org.apache.cassandra.auth.IAuthenticator;
4 import org.apache.cassandra.thrift.*;
5 import org.apache.commons.lang.StringUtils;
6 import org.apache.commons.pool.KeyedPoolableObjectFactory;
7 import org.apache.thrift.protocol.TBinaryProtocol;
8 import org.apache.thrift.transport.*;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11
12 import java.util.*;
13 import java.util.concurrent.atomic.AtomicReference;
14
15 /**
16  * A factory compatible with Apache commons-pool for Cassandra Thrift API
17  * connections.
18  *
19  * @author Dan LaRocque <dalaro@hopcount.org>
20  */
21 public class CTConnectionFactory implements KeyedPoolableObjectFactory<String, CTConnection> {
22
23     private static final Logger log = LoggerFactory.getLogger(CTConnectionFactory.class);
24     private static final long SCHEMA_WAIT_MAX = 5000L;
25     private static final long SCHEMA_WAIT_INCREMENT = 25L;
26
27     private final AtomicReference<Config> cfgRef;
28
29     private CTConnectionFactory(Config config) {
30         this.cfgRef = new AtomicReference<Config>(config);
31     }
32
33     @Override
34     public void activateObject(String key, CTConnection c) throws Exception {
35         // Do nothing, as in passivateObject
36     }
37
38     @Override
39     public void destroyObject(String key, CTConnection c) throws Exception {
40         TTransport t = c.getTransport();
41
42         if (t.isOpen()) {
43             t.close();
44             log.trace("Closed transport {}", t);
45         } else {
46             log.trace("Not closing transport {} (already closed)", t);
47         }
48     }
49
50     @Override
51     public CTConnection makeObject(String key) throws Exception {
52         CTConnection conn = makeRawConnection();
53         Cassandra.Client client = conn.getClient();
54         client.set_keyspace(key);
55
56         return conn;
57     }
58
59     /**
60      * Create a Cassandra-Thrift connection, but do not attempt to
61      * set a keyspace on the connection.
62      *
63      * @return A CTConnection ready to talk to a Cassandra cluster
64      * @throws TTransportException on any Thrift transport failure
65      */
66     public CTConnection makeRawConnection() throws TTransportException {
67         final Config cfg = cfgRef.get();
68
69         String hostname = cfg.getRandomHost();
70
71         log.debug("Creating TSocket({}, {}, {}, {}, {})", hostname, cfg.port, cfg.username, cfg.password, cfg.timeoutMS);
72
73         TSocket socket;
74         if (null != cfg.sslTruststoreLocation && !cfg.sslTruststoreLocation.isEmpty()) {
75             TSSLTransportFactory.TSSLTransportParameters params = new TSSLTransportFactory.TSSLTransportParameters() {{
76                setTrustStore(cfg.sslTruststoreLocation, cfg.sslTruststorePassword);
77             }};
78             socket = TSSLTransportFactory.getClientSocket(hostname, cfg.port, cfg.timeoutMS, params);
79         } else {
80             socket = new TSocket(hostname, cfg.port, cfg.timeoutMS);
81         }
82
83         TTransport transport = new TFramedTransport(socket, cfg.frameSize);
84         log.trace("Created transport {}", transport);
85         TBinaryProtocol protocol = new TBinaryProtocol(transport);
86         Cassandra.Client client = new Cassandra.Client(protocol);
87         if (!transport.isOpen()) {
88             transport.open();
89         }
90
91         if (cfg.username != null) {
92             Map<String, String> credentials = new HashMap<String, String>() {{
93                 put(IAuthenticator.USERNAME_KEY, cfg.username);
94                 put(IAuthenticator.PASSWORD_KEY, cfg.password);
95             }};
96
97             try {
98                 client.login(new AuthenticationRequest(credentials));
99             } catch (Exception e) { // TTransportException will propagate authentication/authorization failure
100                 throw new TTransportException(e);
101             }
102         }
103         return new CTConnection(transport, client, cfg);
104     }
105
106     @Override
107     public void passivateObject(String key, CTConnection o) throws Exception {
108         // Do nothing, as in activateObject
109     }
110
111     @Override
112     public boolean validateObject(String key, CTConnection c) {
113         Config curCfg = cfgRef.get();
114
115         boolean isSameConfig = c.getConfig().equals(curCfg);
116         if (log.isDebugEnabled()) {
117             if (isSameConfig) {
118                 log.trace("Validated {} by configuration {}", c, curCfg);
119             } else {
120                 log.trace("Rejected {}; current config is {}; rejected connection config is {}",
121                           c, curCfg, c.getConfig());
122             }
123         }
124
125         return isSameConfig && c.isOpen();
126     }
127
128     public static class Config {
129
130         private final String[] hostnames;
131         private final int port;
132         private final String username;
133         private final String password;
134         private final Random random;
135
136         private int timeoutMS;
137         private int frameSize;
138
139         private String sslTruststoreLocation;
140         private String sslTruststorePassword;
141
142         private boolean isBuilt;
143
144         public Config(String[] hostnames, int port, String username, String password) {
145             this.hostnames = hostnames;
146             this.port = port;
147             this.username = username;
148             this.password = password;
149             this.random = new Random();
150         }
151
152         // TODO: we don't really need getters/setters here as all of the fields are final and immutable
153
154         public String getHostname() {
155             return hostnames[0];
156         }
157
158         public int getPort() {
159             return port;
160         }
161
162         public String getRandomHost() {
163             return hostnames.length == 1 ? hostnames[0] : hostnames[random.nextInt(hostnames.length)];
164         }
165
166         public Config setTimeoutMS(int timeoutMS) {
167             checkIfAlreadyBuilt();
168             this.timeoutMS = timeoutMS;
169             return this;
170         }
171
172         public Config setFrameSize(int frameSize) {
173             checkIfAlreadyBuilt();
174             this.frameSize = frameSize;
175             return this;
176         }
177
178         public Config setSSLTruststoreLocation(String location) {
179             checkIfAlreadyBuilt();
180             this.sslTruststoreLocation = location;
181             return this;
182         }
183
184         public Config setSSLTruststorePassword(String password) {
185             checkIfAlreadyBuilt();
186             this.sslTruststorePassword = password;
187             return this;
188         }
189
190         public CTConnectionFactory build() {
191             isBuilt = true;
192             return new CTConnectionFactory(this);
193         }
194
195
196         public void checkIfAlreadyBuilt() {
197             if (isBuilt)
198                 throw new IllegalStateException("Can't accept modifications when used with built factory.");
199         }
200
201         @Override
202         public String toString() {
203             return "Config[hostnames=" + StringUtils.join(hostnames, ',') + ", port=" + port
204                     + ", timeoutMS=" + timeoutMS + ", frameSize=" + frameSize
205                     + "]";
206         }
207     }
208
209 }
210