1 /*global Buffer require exports console setTimeout */
3 var net = require("net"),
4 util = require("./lib/util"),
5 Queue = require("./lib/queue"),
6 to_array = require("./lib/to_array"),
7 events = require("events"),
8 crypto = require("crypto"),
9 parsers = [], commands,
12 default_host = "127.0.0.1";
14 // can set this to true to enable for all connections
15 exports.debug_mode = false;
17 // hiredis might not be installed
19 require("./lib/parser/hiredis");
20 parsers.push(require("./lib/parser/hiredis"));
22 if (exports.debug_mode) {
23 console.warn("hiredis parser not installed.");
27 parsers.push(require("./lib/parser/javascript"));
29 function RedisClient(stream, options) {
31 this.options = options = options || {};
33 this.connection_id = ++connection_id;
34 this.connected = false;
37 if (this.options.socket_nodelay === undefined) {
38 this.options.socket_nodelay = true;
40 this.should_buffer = false;
41 this.command_queue_high_water = this.options.command_queue_high_water || 1000;
42 this.command_queue_low_water = this.options.command_queue_low_water || 0;
43 this.max_attempts = null;
44 if (options.max_attempts && !isNaN(options.max_attempts) && options.max_attempts > 0) {
45 this.max_attempts = +options.max_attempts;
47 this.command_queue = new Queue(); // holds sent commands to de-pipeline them
48 this.offline_queue = new Queue(); // holds commands issued but not able to be sent
49 this.commands_sent = 0;
50 this.connect_timeout = false;
51 if (options.connect_timeout && !isNaN(options.connect_timeout) && options.connect_timeout > 0) {
52 this.connect_timeout = +options.connect_timeout;
55 this.enable_offline_queue = true;
56 if (typeof this.options.enable_offline_queue === "boolean") {
57 this.enable_offline_queue = this.options.enable_offline_queue;
60 this.initialize_retry_vars();
61 this.pub_sub_mode = false;
62 this.subscription_set = {};
63 this.monitoring = false;
65 this.server_info = {};
66 this.auth_pass = null;
67 this.parser_module = null;
68 this.selected_db = null; // save the selected db here, used when reconnecting
70 this.old_state = null;
74 this.stream.on("connect", function () {
78 this.stream.on("data", function (buffer_from_socket) {
79 self.on_data(buffer_from_socket);
82 this.stream.on("error", function (msg) {
83 self.on_error(msg.message);
86 this.stream.on("close", function () {
87 self.connection_gone("close");
90 this.stream.on("end", function () {
91 self.connection_gone("end");
94 this.stream.on("drain", function () {
95 self.should_buffer = false;
99 events.EventEmitter.call(this);
101 util.inherits(RedisClient, events.EventEmitter);
102 exports.RedisClient = RedisClient;
104 RedisClient.prototype.initialize_retry_vars = function () {
105 this.retry_timer = null;
106 this.retry_totaltime = 0;
107 this.retry_delay = 150;
108 this.retry_backoff = 1.7;
112 // flush offline_queue and command_queue, erroring any items with a callback first
113 RedisClient.prototype.flush_and_error = function (message) {
115 while (this.offline_queue.length > 0) {
116 command_obj = this.offline_queue.shift();
117 if (typeof command_obj.callback === "function") {
118 command_obj.callback(message);
121 this.offline_queue = new Queue();
123 while (this.command_queue.length > 0) {
124 command_obj = this.command_queue.shift();
125 if (typeof command_obj.callback === "function") {
126 command_obj.callback(message);
129 this.command_queue = new Queue();
132 RedisClient.prototype.on_error = function (msg) {
133 var message = "Redis connection to " + this.host + ":" + this.port + " failed - " + msg,
134 self = this, command_obj;
140 if (exports.debug_mode) {
141 console.warn(message);
144 this.flush_and_error(message);
146 this.connected = false;
149 this.emit("error", new Error(message));
150 // "error" events get turned into exceptions if they aren't listened for. If the user handled this error
151 // then we should try to reconnect.
152 this.connection_gone("error");
155 RedisClient.prototype.do_auth = function () {
158 if (exports.debug_mode) {
159 console.log("Sending auth to " + self.host + ":" + self.port + " id " + self.connection_id);
161 self.send_anyway = true;
162 self.send_command("auth", [this.auth_pass], function (err, res) {
164 if (err.toString().match("LOADING")) {
165 // if redis is still loading the db, it will not authenticate and everything else will fail
166 console.log("Redis still loading, trying to authenticate later");
167 setTimeout(function () {
169 }, 2000); // TODO - magic number alert
172 return self.emit("error", new Error("Auth error: " + err.message));
175 if (res.toString() !== "OK") {
176 return self.emit("error", new Error("Auth failed: " + res.toString()));
178 if (exports.debug_mode) {
179 console.log("Auth succeeded " + self.host + ":" + self.port + " id " + self.connection_id);
181 if (self.auth_callback) {
182 self.auth_callback(err, res);
183 self.auth_callback = null;
186 // now we are really connected
187 self.emit("connect");
188 if (self.options.no_ready_check) {
194 self.send_anyway = false;
197 RedisClient.prototype.on_connect = function () {
198 if (exports.debug_mode) {
199 console.log("Stream connected " + this.host + ":" + this.port + " id " + this.connection_id);
203 this.connected = true;
206 this.connections += 1;
207 this.command_queue = new Queue();
208 this.emitted_end = false;
209 this.initialize_retry_vars();
210 if (this.options.socket_nodelay) {
211 this.stream.setNoDelay();
213 this.stream.setTimeout(0);
217 if (this.auth_pass) {
220 this.emit("connect");
222 if (this.options.no_ready_check) {
230 RedisClient.prototype.init_parser = function () {
233 if (this.options.parser) {
234 if (! parsers.some(function (parser) {
235 if (parser.name === self.options.parser) {
236 self.parser_module = parser;
237 if (exports.debug_mode) {
238 console.log("Using parser module: " + self.parser_module.name);
243 throw new Error("Couldn't find named parser " + self.options.parser + " on this system");
246 if (exports.debug_mode) {
247 console.log("Using default parser module: " + parsers[0].name);
249 this.parser_module = parsers[0];
252 this.parser_module.debug_mode = exports.debug_mode;
254 // return_buffers sends back Buffers from parser to callback. detect_buffers sends back Buffers from parser, but
255 // converts to Strings if the input arguments are not Buffers.
256 this.reply_parser = new this.parser_module.Parser({
257 return_buffers: self.options.return_buffers || self.options.detect_buffers || false
260 // "reply error" is an error sent back by Redis
261 this.reply_parser.on("reply error", function (reply) {
262 self.return_error(new Error(reply));
264 this.reply_parser.on("reply", function (reply) {
265 self.return_reply(reply);
267 // "error" is bad. Somehow the parser got confused. It'll try to reset and continue.
268 this.reply_parser.on("error", function (err) {
269 self.emit("error", new Error("Redis reply parser error: " + err.stack));
273 RedisClient.prototype.on_ready = function () {
278 if (this.old_state !== null) {
279 this.monitoring = this.old_state.monitoring;
280 this.pub_sub_mode = this.old_state.pub_sub_mode;
281 this.selected_db = this.old_state.selected_db;
282 this.old_state = null;
285 // magically restore any modal commands from a previous connection
286 if (this.selected_db !== null) {
287 this.send_command('select', [this.selected_db]);
289 if (this.pub_sub_mode === true) {
290 // only emit "ready" when all subscriptions were made again
291 var callback_count = 0;
292 var callback = function() {
294 if (callback_count == 0) {
298 Object.keys(this.subscription_set).forEach(function (key) {
299 var parts = key.split(" ");
300 if (exports.debug_mode) {
301 console.warn("sending pub/sub on_ready " + parts[0] + ", " + parts[1]);
304 self.send_command(parts[0] + "scribe", [parts[1]], callback);
307 } else if (this.monitoring) {
308 this.send_command("monitor");
310 this.send_offline_queue();
315 RedisClient.prototype.on_info_cmd = function (err, res) {
316 var self = this, obj = {}, lines, retry_time;
319 return self.emit("error", new Error("Ready check failed: " + err.message));
322 lines = res.toString().split("\r\n");
324 lines.forEach(function (line) {
325 var parts = line.split(':');
327 obj[parts[0]] = parts[1];
332 obj.redis_version.split('.').forEach(function (num) {
333 obj.versions.push(+num);
336 // expose info key/vals to users
337 this.server_info = obj;
339 if (!obj.loading || (obj.loading && obj.loading === "0")) {
340 if (exports.debug_mode) {
341 console.log("Redis server ready.");
345 retry_time = obj.loading_eta_seconds * 1000;
346 if (retry_time > 1000) {
349 if (exports.debug_mode) {
350 console.log("Redis server still loading, trying again in " + retry_time);
352 setTimeout(function () {
358 RedisClient.prototype.ready_check = function () {
361 if (exports.debug_mode) {
362 console.log("checking server ready state...");
365 this.send_anyway = true; // secret flag to send_command to send something even if not "ready"
366 this.info(function (err, res) {
367 self.on_info_cmd(err, res);
369 this.send_anyway = false;
372 RedisClient.prototype.send_offline_queue = function () {
373 var command_obj, buffered_writes = 0;
375 while (this.offline_queue.length > 0) {
376 command_obj = this.offline_queue.shift();
377 if (exports.debug_mode) {
378 console.log("Sending offline command: " + command_obj.command);
380 buffered_writes += !this.send_command(command_obj.command, command_obj.args, command_obj.callback);
382 this.offline_queue = new Queue();
383 // Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
385 if (!buffered_writes) {
386 this.should_buffer = false;
391 RedisClient.prototype.connection_gone = function (why) {
392 var self = this, message;
394 // If a retry is already in progress, just let that happen
395 if (this.retry_timer) {
399 if (exports.debug_mode) {
400 console.warn("Redis connection is gone from " + why + " event.");
402 this.connected = false;
405 if (this.old_state === null) {
407 monitoring: this.monitoring,
408 pub_sub_mode: this.pub_sub_mode,
409 selected_db: this.selected_db
411 this.old_state = state;
412 this.monitoring = false;
413 this.pub_sub_mode = false;
414 this.selected_db = null;
417 // since we are collapsing end and close, users don't expect to be called twice
418 if (! this.emitted_end) {
420 this.emitted_end = true;
423 this.flush_and_error("Redis connection gone from " + why + " event.");
425 // If this is a requested shutdown, then don't retry
427 this.retry_timer = null;
428 if (exports.debug_mode) {
429 console.warn("connection ended from quit command, not retrying.");
434 this.retry_delay = Math.floor(this.retry_delay * this.retry_backoff);
436 if (exports.debug_mode) {
437 console.log("Retry connection in " + this.current_retry_delay + " ms");
440 if (this.max_attempts && this.attempts >= this.max_attempts) {
441 this.retry_timer = null;
442 // TODO - some people need a "Redis is Broken mode" for future commands that errors immediately, and others
443 // want the program to exit. Right now, we just log, which doesn't really help in either case.
444 console.error("node_redis: Couldn't get Redis connection after " + this.max_attempts + " attempts.");
449 this.emit("reconnecting", {
450 delay: self.retry_delay,
451 attempt: self.attempts
453 this.retry_timer = setTimeout(function () {
454 if (exports.debug_mode) {
455 console.log("Retrying connection...");
458 self.retry_totaltime += self.current_retry_delay;
460 if (self.connect_timeout && self.retry_totaltime >= self.connect_timeout) {
461 self.retry_timer = null;
462 // TODO - engage Redis is Broken mode for future commands, or whatever
463 console.error("node_redis: Couldn't get Redis connection after " + self.retry_totaltime + "ms.");
467 self.stream.connect(self.port, self.host);
468 self.retry_timer = null;
469 }, this.retry_delay);
472 RedisClient.prototype.on_data = function (data) {
473 if (exports.debug_mode) {
474 console.log("net read " + this.host + ":" + this.port + " id " + this.connection_id + ": " + data.toString());
478 this.reply_parser.execute(data);
480 // This is an unexpected parser problem, an exception that came from the parser code itself.
481 // Parser should emit "error" events if it notices things are out of whack.
482 // Callbacks that throw exceptions will land in return_reply(), below.
483 // TODO - it might be nice to have a different "error" event for different types of errors
484 this.emit("error", err);
488 RedisClient.prototype.return_error = function (err) {
489 var command_obj = this.command_queue.shift(), queue_len = this.command_queue.getLength();
491 if (this.pub_sub_mode === false && queue_len === 0) {
493 this.command_queue = new Queue();
495 if (this.should_buffer && queue_len <= this.command_queue_low_water) {
497 this.should_buffer = false;
500 if (command_obj && typeof command_obj.callback === "function") {
502 command_obj.callback(err);
503 } catch (callback_err) {
504 // if a callback throws an exception, re-throw it on a new stack so the parser can keep going
505 process.nextTick(function () {
510 console.log("node_redis: no callback to send error: " + err.message);
511 // this will probably not make it anywhere useful, but we might as well throw
512 process.nextTick(function () {
518 // if a callback throws an exception, re-throw it on a new stack so the parser can keep going.
519 // put this try/catch in its own function because V8 doesn't optimize this well yet.
520 function try_callback(callback, reply) {
522 callback(null, reply);
524 process.nextTick(function () {
530 // hgetall converts its replies to an Object. If the reply is empty, null is returned.
531 function reply_to_object(reply) {
532 var obj = {}, j, jl, key, val;
534 if (reply.length === 0) {
538 for (j = 0, jl = reply.length; j < jl; j += 2) {
539 key = reply[j].toString();
547 function reply_to_strings(reply) {
550 if (Buffer.isBuffer(reply)) {
551 return reply.toString();
554 if (Array.isArray(reply)) {
555 for (i = 0; i < reply.length; i++) {
556 reply[i] = reply[i].toString();
564 RedisClient.prototype.return_reply = function (reply) {
565 var command_obj, obj, i, len, type, timestamp, argindex, args, queue_len;
567 command_obj = this.command_queue.shift(),
568 queue_len = this.command_queue.getLength();
570 if (this.pub_sub_mode === false && queue_len === 0) {
572 this.command_queue = new Queue(); // explicitly reclaim storage from old Queue
574 if (this.should_buffer && queue_len <= this.command_queue_low_water) {
576 this.should_buffer = false;
579 if (command_obj && !command_obj.sub_command) {
580 if (typeof command_obj.callback === "function") {
581 if (this.options.detect_buffers && command_obj.buffer_args === false) {
582 // If detect_buffers option was specified, then the reply from the parser will be Buffers.
583 // If this command did not use Buffer arguments, then convert the reply to Strings here.
584 reply = reply_to_strings(reply);
587 // TODO - confusing and error-prone that hgetall is special cased in two places
588 if (reply && 'hgetall' === command_obj.command.toLowerCase()) {
589 reply = reply_to_object(reply);
592 try_callback(command_obj.callback, reply);
593 } else if (exports.debug_mode) {
594 console.log("no callback for reply: " + (reply && reply.toString && reply.toString()));
596 } else if (this.pub_sub_mode || (command_obj && command_obj.sub_command)) {
597 if (Array.isArray(reply)) {
598 type = reply[0].toString();
600 if (type === "message") {
601 this.emit("message", reply[1].toString(), reply[2]); // channel, message
602 } else if (type === "pmessage") {
603 this.emit("pmessage", reply[1].toString(), reply[2].toString(), reply[3]); // pattern, channel, message
604 } else if (type === "subscribe" || type === "unsubscribe" || type === "psubscribe" || type === "punsubscribe") {
605 if (reply[2] === 0) {
606 this.pub_sub_mode = false;
607 if (this.debug_mode) {
608 console.log("All subscriptions removed, exiting pub/sub mode");
611 this.pub_sub_mode = true;
613 // subscribe commands take an optional callback and also emit an event, but only the first response is included in the callback
614 // TODO - document this or fix it so it works in a more obvious way
615 if (command_obj && typeof command_obj.callback === "function") {
616 try_callback(command_obj.callback, reply[1].toString());
618 this.emit(type, reply[1].toString(), reply[2]); // channel, count
620 throw new Error("subscriptions are active but got unknown reply type " + type);
622 } else if (! this.closing) {
623 throw new Error("subscriptions are active but got an invalid reply: " + reply);
625 } else if (this.monitoring) {
626 len = reply.indexOf(" ");
627 timestamp = reply.slice(0, len);
628 argindex = reply.indexOf('"');
629 args = reply.slice(argindex + 1, -1).split('" "').map(function (elem) {
630 return elem.replace(/\\"/g, '"');
632 this.emit("monitor", timestamp, args);
634 throw new Error("node_redis command queue state error. If you can reproduce this, please report it.");
638 // This Command constructor is ever so slightly faster than using an object literal, but more importantly, using
639 // a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots.
640 function Command(command, args, sub_command, buffer_args, callback) {
641 this.command = command;
643 this.sub_command = sub_command;
644 this.buffer_args = buffer_args;
645 this.callback = callback;
648 RedisClient.prototype.send_command = function (command, args, callback) {
649 var arg, this_args, command_obj, i, il, elem_count, buffer_args, stream = this.stream, command_str = "", buffered_writes = 0, last_arg_type;
651 if (typeof command !== "string") {
652 throw new Error("First argument to send_command must be the command name string, not " + typeof command);
655 if (Array.isArray(args)) {
656 if (typeof callback === "function") {
657 // probably the fastest way:
658 // client.command([arg1, arg2], cb); (straight passthrough)
659 // send_command(command, [arg1, arg2], cb);
660 } else if (! callback) {
661 // most people find this variable argument length form more convenient, but it uses arguments, which is slower
662 // client.command(arg1, arg2, cb); (wraps up arguments into an array)
663 // send_command(command, [arg1, arg2, cb]);
664 // client.command(arg1, arg2); (callback is optional)
665 // send_command(command, [arg1, arg2]);
666 // client.command(arg1, arg2, undefined); (callback is undefined)
667 // send_command(command, [arg1, arg2, undefined]);
668 last_arg_type = typeof args[args.length - 1];
669 if (last_arg_type === "function" || last_arg_type === "undefined") {
670 callback = args.pop();
673 throw new Error("send_command: last argument must be a callback or undefined");
676 throw new Error("send_command: second argument must be an array");
679 // if the last argument is an array and command is sadd, expand it out:
680 // client.sadd(arg1, [arg2, arg3, arg4], cb);
682 // client.sadd(arg1, arg2, arg3, arg4, cb);
683 if ((command === 'sadd' || command === 'SADD') && args.length > 0 && Array.isArray(args[args.length - 1])) {
684 args = args.slice(0, -1).concat(args[args.length - 1]);
688 for (i = 0, il = args.length, arg; i < il; i += 1) {
689 if (Buffer.isBuffer(args[i])) {
694 command_obj = new Command(command, args, false, buffer_args, callback);
696 if ((!this.ready && !this.send_anyway) || !stream.writable) {
697 if (exports.debug_mode) {
698 if (!stream.writable) {
699 console.log("send command: stream is not writeable.");
703 if (this.enable_offline_queue) {
704 if (exports.debug_mode) {
705 console.log("Queueing " + command + " for next server connection.");
707 this.offline_queue.push(command_obj);
708 this.should_buffer = true;
710 var not_writeable_error = new Error('send_command: stream not writeable. enable_offline_queue is false');
711 if (command_obj.callback) {
712 command_obj.callback(not_writeable_error);
714 throw not_writeable_error;
721 if (command === "subscribe" || command === "psubscribe" || command === "unsubscribe" || command === "punsubscribe") {
722 this.pub_sub_command(command_obj);
723 } else if (command === "monitor") {
724 this.monitoring = true;
725 } else if (command === "quit") {
727 } else if (this.pub_sub_mode === true) {
728 throw new Error("Connection in pub/sub mode, only pub/sub commands may be used");
730 this.command_queue.push(command_obj);
731 this.commands_sent += 1;
733 elem_count = args.length + 1;
735 // Always use "Multi bulk commands", but if passed any Buffer args, then do multiple writes, one for each arg.
736 // This means that using Buffers in commands is going to be slower, so use Strings if you don't already have a Buffer.
738 command_str = "*" + elem_count + "\r\n$" + command.length + "\r\n" + command + "\r\n";
740 if (! buffer_args) { // Build up a string and send entire command in one write
741 for (i = 0, il = args.length, arg; i < il; i += 1) {
743 if (typeof arg !== "string") {
746 command_str += "$" + Buffer.byteLength(arg) + "\r\n" + arg + "\r\n";
748 if (exports.debug_mode) {
749 console.log("send " + this.host + ":" + this.port + " id " + this.connection_id + ": " + command_str);
751 buffered_writes += !stream.write(command_str);
753 if (exports.debug_mode) {
754 console.log("send command (" + command_str + ") has Buffer arguments");
756 buffered_writes += !stream.write(command_str);
758 for (i = 0, il = args.length, arg; i < il; i += 1) {
760 if (!(Buffer.isBuffer(arg) || arg instanceof String)) {
764 if (Buffer.isBuffer(arg)) {
765 if (arg.length === 0) {
766 if (exports.debug_mode) {
767 console.log("send_command: using empty string for 0 length buffer");
769 buffered_writes += !stream.write("$0\r\n\r\n");
771 buffered_writes += !stream.write("$" + arg.length + "\r\n");
772 buffered_writes += !stream.write(arg);
773 buffered_writes += !stream.write("\r\n");
774 if (exports.debug_mode) {
775 console.log("send_command: buffer send " + arg.length + " bytes");
779 if (exports.debug_mode) {
780 console.log("send_command: string send " + Buffer.byteLength(arg) + " bytes: " + arg);
782 buffered_writes += !stream.write("$" + Buffer.byteLength(arg) + "\r\n" + arg + "\r\n");
786 if (exports.debug_mode) {
787 console.log("send_command buffered_writes: " + buffered_writes, " should_buffer: " + this.should_buffer);
789 if (buffered_writes || this.command_queue.getLength() >= this.command_queue_high_water) {
790 this.should_buffer = true;
792 return !this.should_buffer;
795 RedisClient.prototype.pub_sub_command = function (command_obj) {
796 var i, key, command, args;
798 if (this.pub_sub_mode === false && exports.debug_mode) {
799 console.log("Entering pub/sub mode from " + command_obj.command);
801 this.pub_sub_mode = true;
802 command_obj.sub_command = true;
804 command = command_obj.command;
805 args = command_obj.args;
806 if (command === "subscribe" || command === "psubscribe") {
807 if (command === "subscribe") {
812 for (i = 0; i < args.length; i++) {
813 this.subscription_set[key + " " + args[i]] = true;
816 if (command === "unsubscribe") {
821 for (i = 0; i < args.length; i++) {
822 delete this.subscription_set[key + " " + args[i]];
827 RedisClient.prototype.end = function () {
828 this.stream._events = {};
829 this.connected = false;
831 return this.stream.end();
834 function Multi(client, args) {
835 this.client = client;
836 this.queue = [["MULTI"]];
837 if (Array.isArray(args)) {
838 this.queue = this.queue.concat(args);
842 exports.Multi = Multi;
844 // take 2 arrays and return the union of their elements
845 function set_union(seta, setb) {
848 seta.forEach(function (val) {
851 setb.forEach(function (val) {
854 return Object.keys(obj);
857 // This static list of commands is updated from time to time. ./lib/commands.js can be updated with generate_commands.js
858 commands = set_union(["get", "set", "setnx", "setex", "append", "strlen", "del", "exists", "setbit", "getbit", "setrange", "getrange", "substr",
859 "incr", "decr", "mget", "rpush", "lpush", "rpushx", "lpushx", "linsert", "rpop", "lpop", "brpop", "brpoplpush", "blpop", "llen", "lindex",
860 "lset", "lrange", "ltrim", "lrem", "rpoplpush", "sadd", "srem", "smove", "sismember", "scard", "spop", "srandmember", "sinter", "sinterstore",
861 "sunion", "sunionstore", "sdiff", "sdiffstore", "smembers", "zadd", "zincrby", "zrem", "zremrangebyscore", "zremrangebyrank", "zunionstore",
862 "zinterstore", "zrange", "zrangebyscore", "zrevrangebyscore", "zcount", "zrevrange", "zcard", "zscore", "zrank", "zrevrank", "hset", "hsetnx",
863 "hget", "hmset", "hmget", "hincrby", "hdel", "hlen", "hkeys", "hvals", "hgetall", "hexists", "incrby", "decrby", "getset", "mset", "msetnx",
864 "randomkey", "select", "move", "rename", "renamenx", "expire", "expireat", "keys", "dbsize", "auth", "ping", "echo", "save", "bgsave",
865 "bgrewriteaof", "shutdown", "lastsave", "type", "multi", "exec", "discard", "sync", "flushdb", "flushall", "sort", "info", "monitor", "ttl",
866 "persist", "slaveof", "debug", "config", "subscribe", "unsubscribe", "psubscribe", "punsubscribe", "publish", "watch", "unwatch", "cluster",
867 "restore", "migrate", "dump", "object", "client", "eval", "evalsha"], require("./lib/commands"));
869 commands.forEach(function (command) {
870 RedisClient.prototype[command] = function (args, callback) {
871 if (Array.isArray(args) && typeof callback === "function") {
872 return this.send_command(command, args, callback);
874 return this.send_command(command, to_array(arguments));
877 RedisClient.prototype[command.toUpperCase()] = RedisClient.prototype[command];
879 Multi.prototype[command] = function () {
880 this.queue.push([command].concat(to_array(arguments)));
883 Multi.prototype[command.toUpperCase()] = Multi.prototype[command];
886 // store db in this.select_db to restore it on reconnect
887 RedisClient.prototype.select = function (db, callback) {
890 this.send_command('select', [db], function (err, res) {
892 self.selected_db = db;
894 if (typeof(callback) === 'function') {
899 RedisClient.prototype.SELECT = RedisClient.prototype.select;
901 // Stash auth for connect and reconnect. Send immediately if already connected.
902 RedisClient.prototype.auth = function () {
903 var args = to_array(arguments);
904 this.auth_pass = args[0];
905 this.auth_callback = args[1];
906 if (exports.debug_mode) {
907 console.log("Saving auth as " + this.auth_pass);
910 if (this.connected) {
911 this.send_command("auth", args);
914 RedisClient.prototype.AUTH = RedisClient.prototype.auth;
916 RedisClient.prototype.hmget = function (arg1, arg2, arg3) {
917 if (Array.isArray(arg2) && typeof arg3 === "function") {
918 return this.send_command("hmget", [arg1].concat(arg2), arg3);
919 } else if (Array.isArray(arg1) && typeof arg2 === "function") {
920 return this.send_command("hmget", arg1, arg2);
922 return this.send_command("hmget", to_array(arguments));
925 RedisClient.prototype.HMGET = RedisClient.prototype.hmget;
927 RedisClient.prototype.hmset = function (args, callback) {
928 var tmp_args, tmp_keys, i, il, key;
930 if (Array.isArray(args) && typeof callback === "function") {
931 return this.send_command("hmset", args, callback);
934 args = to_array(arguments);
935 if (typeof args[args.length - 1] === "function") {
936 callback = args[args.length - 1];
942 if (args.length === 2 && typeof args[0] === "string" && typeof args[1] === "object") {
943 // User does: client.hmset(key, {key1: val1, key2: val2})
944 tmp_args = [ args[0] ];
945 tmp_keys = Object.keys(args[1]);
946 for (i = 0, il = tmp_keys.length; i < il ; i++) {
949 if (typeof args[1][key] !== "string") {
950 var err = new Error("hmset expected value to be a string", key, ":", args[1][key]);
951 if (callback) return callback(err);
954 tmp_args.push(args[1][key]);
959 return this.send_command("hmset", args, callback);
961 RedisClient.prototype.HMSET = RedisClient.prototype.hmset;
963 Multi.prototype.hmset = function () {
964 var args = to_array(arguments), tmp_args;
965 if (args.length >= 2 && typeof args[0] === "string" && typeof args[1] === "object") {
966 tmp_args = [ "hmset", args[0] ];
967 Object.keys(args[1]).map(function (key) {
969 tmp_args.push(args[1][key]);
972 tmp_args.push(args[2]);
976 args.unshift("hmset");
979 this.queue.push(args);
982 Multi.prototype.HMSET = Multi.prototype.hmset;
984 Multi.prototype.exec = function (callback) {
987 // drain queue, callback will catch "QUEUED" or error
988 // TODO - get rid of all of these anonymous functions which are elegant but slow
989 this.queue.forEach(function (args, index) {
990 var command = args[0], obj;
991 if (typeof args[args.length - 1] === "function") {
992 args = args.slice(1, -1);
994 args = args.slice(1);
996 if (args.length === 1 && Array.isArray(args[0])) {
999 if (command.toLowerCase() === 'hmset' && typeof args[1] === 'object') {
1001 Object.keys(obj).forEach(function (key) {
1003 args.push(obj[key]);
1006 this.client.send_command(command, args, function (err, reply) {
1008 var cur = self.queue[index];
1009 if (typeof cur[cur.length - 1] === "function") {
1010 cur[cur.length - 1](err);
1012 throw new Error(err);
1014 self.queue.splice(index, 1);
1019 // TODO - make this callback part of Multi.prototype instead of creating it each time
1020 return this.client.send_command("EXEC", [], function (err, replies) {
1023 callback(new Error(err));
1026 throw new Error(err);
1030 var i, il, j, jl, reply, args;
1033 for (i = 1, il = self.queue.length; i < il; i += 1) {
1034 reply = replies[i - 1];
1035 args = self.queue[i];
1037 // TODO - confusing and error-prone that hgetall is special cased in two places
1038 if (reply && args[0].toLowerCase() === "hgetall") {
1039 replies[i - 1] = reply = reply_to_object(reply);
1042 if (typeof args[args.length - 1] === "function") {
1043 args[args.length - 1](null, reply);
1049 callback(null, replies);
1053 Multi.prototype.EXEC = Multi.prototype.exec;
1055 RedisClient.prototype.multi = function (args) {
1056 return new Multi(this, args);
1058 RedisClient.prototype.MULTI = function (args) {
1059 return new Multi(this, args);
1063 // stash original eval method
1064 var eval = RedisClient.prototype.eval;
1065 // hook eval with an attempt to evalsha for cached scripts
1066 RedisClient.prototype.eval =
1067 RedisClient.prototype.EVAL = function () {
1069 args = to_array(arguments),
1072 if (typeof args[args.length - 1] === "function") {
1073 callback = args.pop();
1076 // replace script source with sha value
1077 var source = args[0];
1078 args[0] = crypto.createHash("sha1").update(source).digest("hex");
1080 self.evalsha(args, function (err, reply) {
1081 if (err && /NOSCRIPT/.test(err.message)) {
1083 eval.call(self, args, callback);
1085 } else if (callback) {
1086 callback(err, reply);
1092 exports.createClient = function (port_arg, host_arg, options) {
1093 var port = port_arg || default_port,
1094 host = host_arg || default_host,
1095 redis_client, net_client;
1097 net_client = net.createConnection(port, host);
1099 redis_client = new RedisClient(net_client, options);
1101 redis_client.port = port;
1102 redis_client.host = host;
1104 return redis_client;
1107 exports.print = function (err, reply) {
1109 console.log("Error: " + err);
1111 console.log("Reply: " + reply);