Bug:Fix file validation issue
[vnfsdk/refrepo.git] / vnfmarket / src / main / webapp / vnfmarket / node_modules / redis / index.js
1 /*global Buffer require exports console setTimeout */
2
3 var net = require("net"),
4     util = require("./lib/util"),
5     Queue = require("./lib/queue"),
6     to_array = require("./lib/to_array"),
7     events = require("events"),
8     crypto = require("crypto"),
9     parsers = [], commands,
10     connection_id = 0,
11     default_port = 6379,
12     default_host = "127.0.0.1";
13
14 // can set this to true to enable for all connections
15 exports.debug_mode = false;
16
17 // hiredis might not be installed
18 try {
19     require("./lib/parser/hiredis");
20     parsers.push(require("./lib/parser/hiredis"));
21 } catch (err) {
22     if (exports.debug_mode) {
23         console.warn("hiredis parser not installed.");
24     }
25 }
26
27 parsers.push(require("./lib/parser/javascript"));
28
29 function RedisClient(stream, options) {
30     this.stream = stream;
31     this.options = options = options || {};
32
33     this.connection_id = ++connection_id;
34     this.connected = false;
35     this.ready = false;
36     this.connections = 0;
37     if (this.options.socket_nodelay === undefined) {
38         this.options.socket_nodelay = true;
39     }
40     this.should_buffer = false;
41     this.command_queue_high_water = this.options.command_queue_high_water || 1000;
42     this.command_queue_low_water = this.options.command_queue_low_water || 0;
43     this.max_attempts = null;
44     if (options.max_attempts && !isNaN(options.max_attempts) && options.max_attempts > 0) {
45         this.max_attempts = +options.max_attempts;
46     }
47     this.command_queue = new Queue(); // holds sent commands to de-pipeline them
48     this.offline_queue = new Queue(); // holds commands issued but not able to be sent
49     this.commands_sent = 0;
50     this.connect_timeout = false;
51     if (options.connect_timeout && !isNaN(options.connect_timeout) && options.connect_timeout > 0) {
52         this.connect_timeout = +options.connect_timeout;
53     }
54
55     this.enable_offline_queue = true;
56     if (typeof this.options.enable_offline_queue === "boolean") {
57         this.enable_offline_queue = this.options.enable_offline_queue;
58     }
59
60     this.initialize_retry_vars();
61     this.pub_sub_mode = false;
62     this.subscription_set = {};
63     this.monitoring = false;
64     this.closing = false;
65     this.server_info = {};
66     this.auth_pass = null;
67     this.parser_module = null;
68     this.selected_db = null;    // save the selected db here, used when reconnecting
69
70     this.old_state = null;
71
72     var self = this;
73
74     this.stream.on("connect", function () {
75         self.on_connect();
76     });
77
78     this.stream.on("data", function (buffer_from_socket) {
79         self.on_data(buffer_from_socket);
80     });
81
82     this.stream.on("error", function (msg) {
83         self.on_error(msg.message);
84     });
85
86     this.stream.on("close", function () {
87         self.connection_gone("close");
88     });
89
90     this.stream.on("end", function () {
91         self.connection_gone("end");
92     });
93
94     this.stream.on("drain", function () {
95         self.should_buffer = false;
96         self.emit("drain");
97     });
98
99     events.EventEmitter.call(this);
100 }
101 util.inherits(RedisClient, events.EventEmitter);
102 exports.RedisClient = RedisClient;
103
104 RedisClient.prototype.initialize_retry_vars = function () {
105     this.retry_timer = null;
106     this.retry_totaltime = 0;
107     this.retry_delay = 150;
108     this.retry_backoff = 1.7;
109     this.attempts = 1;
110 };
111
112 // flush offline_queue and command_queue, erroring any items with a callback first
113 RedisClient.prototype.flush_and_error = function (message) {
114     var command_obj;
115     while (this.offline_queue.length > 0) {
116         command_obj = this.offline_queue.shift();
117         if (typeof command_obj.callback === "function") {
118             command_obj.callback(message);
119         }
120     }
121     this.offline_queue = new Queue();
122
123     while (this.command_queue.length > 0) {
124         command_obj = this.command_queue.shift();
125         if (typeof command_obj.callback === "function") {
126             command_obj.callback(message);
127         }
128     }
129     this.command_queue = new Queue();
130 };
131
132 RedisClient.prototype.on_error = function (msg) {
133     var message = "Redis connection to " + this.host + ":" + this.port + " failed - " + msg,
134         self = this, command_obj;
135
136     if (this.closing) {
137         return;
138     }
139
140     if (exports.debug_mode) {
141         console.warn(message);
142     }
143
144     this.flush_and_error(message);
145
146     this.connected = false;
147     this.ready = false;
148
149     this.emit("error", new Error(message));
150     // "error" events get turned into exceptions if they aren't listened for.  If the user handled this error
151     // then we should try to reconnect.
152     this.connection_gone("error");
153 };
154
155 RedisClient.prototype.do_auth = function () {
156     var self = this;
157
158     if (exports.debug_mode) {
159         console.log("Sending auth to " + self.host + ":" + self.port + " id " + self.connection_id);
160     }
161     self.send_anyway = true;
162     self.send_command("auth", [this.auth_pass], function (err, res) {
163         if (err) {
164             if (err.toString().match("LOADING")) {
165                 // if redis is still loading the db, it will not authenticate and everything else will fail
166                 console.log("Redis still loading, trying to authenticate later");
167                 setTimeout(function () {
168                     self.do_auth();
169                 }, 2000); // TODO - magic number alert
170                 return;
171             } else {
172                 return self.emit("error", new Error("Auth error: " + err.message));
173             }
174         }
175         if (res.toString() !== "OK") {
176             return self.emit("error", new Error("Auth failed: " + res.toString()));
177         }
178         if (exports.debug_mode) {
179             console.log("Auth succeeded " + self.host + ":" + self.port + " id " + self.connection_id);
180         }
181         if (self.auth_callback) {
182             self.auth_callback(err, res);
183             self.auth_callback = null;
184         }
185
186         // now we are really connected
187         self.emit("connect");
188         if (self.options.no_ready_check) {
189             self.on_ready();
190         } else {
191             self.ready_check();
192         }
193     });
194     self.send_anyway = false;
195 };
196
197 RedisClient.prototype.on_connect = function () {
198     if (exports.debug_mode) {
199         console.log("Stream connected " + this.host + ":" + this.port + " id " + this.connection_id);
200     }
201     var self = this;
202
203     this.connected = true;
204     this.ready = false;
205     this.attempts = 0;
206     this.connections += 1;
207     this.command_queue = new Queue();
208     this.emitted_end = false;
209     this.initialize_retry_vars();
210     if (this.options.socket_nodelay) {
211         this.stream.setNoDelay();
212     }
213     this.stream.setTimeout(0);
214
215     this.init_parser();
216
217     if (this.auth_pass) {
218         this.do_auth();
219     } else {
220         this.emit("connect");
221
222         if (this.options.no_ready_check) {
223             this.on_ready();
224         } else {
225             this.ready_check();
226         }
227     }
228 };
229
230 RedisClient.prototype.init_parser = function () {
231     var self = this;
232
233     if (this.options.parser) {
234         if (! parsers.some(function (parser) {
235             if (parser.name === self.options.parser) {
236                 self.parser_module = parser;
237                 if (exports.debug_mode) {
238                     console.log("Using parser module: " + self.parser_module.name);
239                 }
240                 return true;
241             }
242         })) {
243             throw new Error("Couldn't find named parser " + self.options.parser + " on this system");
244         }
245     } else {
246         if (exports.debug_mode) {
247             console.log("Using default parser module: " + parsers[0].name);
248         }
249         this.parser_module = parsers[0];
250     }
251
252     this.parser_module.debug_mode = exports.debug_mode;
253
254     // return_buffers sends back Buffers from parser to callback. detect_buffers sends back Buffers from parser, but
255     // converts to Strings if the input arguments are not Buffers.
256     this.reply_parser = new this.parser_module.Parser({
257         return_buffers: self.options.return_buffers || self.options.detect_buffers || false
258     });
259
260     // "reply error" is an error sent back by Redis
261     this.reply_parser.on("reply error", function (reply) {
262         self.return_error(new Error(reply));
263     });
264     this.reply_parser.on("reply", function (reply) {
265         self.return_reply(reply);
266     });
267     // "error" is bad.  Somehow the parser got confused.  It'll try to reset and continue.
268     this.reply_parser.on("error", function (err) {
269         self.emit("error", new Error("Redis reply parser error: " + err.stack));
270     });
271 };
272
273 RedisClient.prototype.on_ready = function () {
274     var self = this;
275
276     this.ready = true;
277
278     if (this.old_state !== null) {
279         this.monitoring = this.old_state.monitoring;
280         this.pub_sub_mode = this.old_state.pub_sub_mode;
281         this.selected_db = this.old_state.selected_db;
282         this.old_state = null;
283     }
284
285     // magically restore any modal commands from a previous connection
286     if (this.selected_db !== null) {
287         this.send_command('select', [this.selected_db]);
288     }
289     if (this.pub_sub_mode === true) {
290         // only emit "ready" when all subscriptions were made again
291         var callback_count = 0;
292         var callback = function() {
293             callback_count--;
294             if (callback_count == 0) {
295                 self.emit("ready");
296             }
297         }
298         Object.keys(this.subscription_set).forEach(function (key) {
299             var parts = key.split(" ");
300             if (exports.debug_mode) {
301                 console.warn("sending pub/sub on_ready " + parts[0] + ", " + parts[1]);
302             }
303             callback_count++;
304             self.send_command(parts[0] + "scribe", [parts[1]], callback);
305         });
306         return;
307     } else if (this.monitoring) {
308         this.send_command("monitor");
309     } else {
310         this.send_offline_queue();
311     }
312     this.emit("ready");
313 };
314
315 RedisClient.prototype.on_info_cmd = function (err, res) {
316     var self = this, obj = {}, lines, retry_time;
317
318     if (err) {
319         return self.emit("error", new Error("Ready check failed: " + err.message));
320     }
321
322     lines = res.toString().split("\r\n");
323
324     lines.forEach(function (line) {
325         var parts = line.split(':');
326         if (parts[1]) {
327             obj[parts[0]] = parts[1];
328         }
329     });
330
331     obj.versions = [];
332     obj.redis_version.split('.').forEach(function (num) {
333         obj.versions.push(+num);
334     });
335
336     // expose info key/vals to users
337     this.server_info = obj;
338
339     if (!obj.loading || (obj.loading && obj.loading === "0")) {
340         if (exports.debug_mode) {
341             console.log("Redis server ready.");
342         }
343         this.on_ready();
344     } else {
345         retry_time = obj.loading_eta_seconds * 1000;
346         if (retry_time > 1000) {
347             retry_time = 1000;
348         }
349         if (exports.debug_mode) {
350             console.log("Redis server still loading, trying again in " + retry_time);
351         }
352         setTimeout(function () {
353             self.ready_check();
354         }, retry_time);
355     }
356 };
357
358 RedisClient.prototype.ready_check = function () {
359     var self = this;
360
361     if (exports.debug_mode) {
362         console.log("checking server ready state...");
363     }
364
365     this.send_anyway = true;  // secret flag to send_command to send something even if not "ready"
366     this.info(function (err, res) {
367         self.on_info_cmd(err, res);
368     });
369     this.send_anyway = false;
370 };
371
372 RedisClient.prototype.send_offline_queue = function () {
373     var command_obj, buffered_writes = 0;
374
375     while (this.offline_queue.length > 0) {
376         command_obj = this.offline_queue.shift();
377         if (exports.debug_mode) {
378             console.log("Sending offline command: " + command_obj.command);
379         }
380         buffered_writes += !this.send_command(command_obj.command, command_obj.args, command_obj.callback);
381     }
382     this.offline_queue = new Queue();
383     // Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
384
385     if (!buffered_writes) {
386         this.should_buffer = false;
387         this.emit("drain");
388     }
389 };
390
391 RedisClient.prototype.connection_gone = function (why) {
392     var self = this, message;
393
394     // If a retry is already in progress, just let that happen
395     if (this.retry_timer) {
396         return;
397     }
398
399     if (exports.debug_mode) {
400         console.warn("Redis connection is gone from " + why + " event.");
401     }
402     this.connected = false;
403     this.ready = false;
404
405     if (this.old_state === null) {
406         var state = {
407             monitoring: this.monitoring,
408             pub_sub_mode: this.pub_sub_mode,
409             selected_db: this.selected_db
410         };
411         this.old_state = state;
412         this.monitoring = false;
413         this.pub_sub_mode = false;
414         this.selected_db = null;
415     }
416
417     // since we are collapsing end and close, users don't expect to be called twice
418     if (! this.emitted_end) {
419         this.emit("end");
420         this.emitted_end = true;
421     }
422
423     this.flush_and_error("Redis connection gone from " + why + " event.");
424
425     // If this is a requested shutdown, then don't retry
426     if (this.closing) {
427         this.retry_timer = null;
428         if (exports.debug_mode) {
429             console.warn("connection ended from quit command, not retrying.");
430         }
431         return;
432     }
433
434     this.retry_delay = Math.floor(this.retry_delay * this.retry_backoff);
435
436     if (exports.debug_mode) {
437         console.log("Retry connection in " + this.current_retry_delay + " ms");
438     }
439
440     if (this.max_attempts && this.attempts >= this.max_attempts) {
441         this.retry_timer = null;
442         // TODO - some people need a "Redis is Broken mode" for future commands that errors immediately, and others
443         // want the program to exit.  Right now, we just log, which doesn't really help in either case.
444         console.error("node_redis: Couldn't get Redis connection after " + this.max_attempts + " attempts.");
445         return;
446     }
447
448     this.attempts += 1;
449     this.emit("reconnecting", {
450         delay: self.retry_delay,
451         attempt: self.attempts
452     });
453     this.retry_timer = setTimeout(function () {
454         if (exports.debug_mode) {
455             console.log("Retrying connection...");
456         }
457
458         self.retry_totaltime += self.current_retry_delay;
459
460         if (self.connect_timeout && self.retry_totaltime >= self.connect_timeout) {
461             self.retry_timer = null;
462             // TODO - engage Redis is Broken mode for future commands, or whatever
463             console.error("node_redis: Couldn't get Redis connection after " + self.retry_totaltime + "ms.");
464             return;
465         }
466
467         self.stream.connect(self.port, self.host);
468         self.retry_timer = null;
469     }, this.retry_delay);
470 };
471
472 RedisClient.prototype.on_data = function (data) {
473     if (exports.debug_mode) {
474         console.log("net read " + this.host + ":" + this.port + " id " + this.connection_id + ": " + data.toString());
475     }
476
477     try {
478         this.reply_parser.execute(data);
479     } catch (err) {
480         // This is an unexpected parser problem, an exception that came from the parser code itself.
481         // Parser should emit "error" events if it notices things are out of whack.
482         // Callbacks that throw exceptions will land in return_reply(), below.
483         // TODO - it might be nice to have a different "error" event for different types of errors
484         this.emit("error", err);
485     }
486 };
487
488 RedisClient.prototype.return_error = function (err) {
489     var command_obj = this.command_queue.shift(), queue_len = this.command_queue.getLength();
490
491     if (this.pub_sub_mode === false && queue_len === 0) {
492         this.emit("idle");
493         this.command_queue = new Queue();
494     }
495     if (this.should_buffer && queue_len <= this.command_queue_low_water) {
496         this.emit("drain");
497         this.should_buffer = false;
498     }
499
500     if (command_obj && typeof command_obj.callback === "function") {
501         try {
502             command_obj.callback(err);
503         } catch (callback_err) {
504             // if a callback throws an exception, re-throw it on a new stack so the parser can keep going
505             process.nextTick(function () {
506                 throw callback_err;
507             });
508         }
509     } else {
510         console.log("node_redis: no callback to send error: " + err.message);
511         // this will probably not make it anywhere useful, but we might as well throw
512         process.nextTick(function () {
513             throw err;
514         });
515     }
516 };
517
518 // if a callback throws an exception, re-throw it on a new stack so the parser can keep going.
519 // put this try/catch in its own function because V8 doesn't optimize this well yet.
520 function try_callback(callback, reply) {
521     try {
522         callback(null, reply);
523     } catch (err) {
524         process.nextTick(function () {
525             throw err;
526         });
527     }
528 }
529
530 // hgetall converts its replies to an Object.  If the reply is empty, null is returned.
531 function reply_to_object(reply) {
532     var obj = {}, j, jl, key, val;
533
534     if (reply.length === 0) {
535         return null;
536     }
537
538     for (j = 0, jl = reply.length; j < jl; j += 2) {
539         key = reply[j].toString();
540         val = reply[j + 1];
541         obj[key] = val;
542     }
543
544     return obj;
545 }
546
547 function reply_to_strings(reply) {
548     var i;
549
550     if (Buffer.isBuffer(reply)) {
551         return reply.toString();
552     }
553
554     if (Array.isArray(reply)) {
555         for (i = 0; i < reply.length; i++) {
556             reply[i] = reply[i].toString();
557         }
558         return reply;
559     }
560
561     return reply;
562 }
563
564 RedisClient.prototype.return_reply = function (reply) {
565     var command_obj, obj, i, len, type, timestamp, argindex, args, queue_len;
566
567     command_obj = this.command_queue.shift(),
568     queue_len   = this.command_queue.getLength();
569
570     if (this.pub_sub_mode === false && queue_len === 0) {
571         this.emit("idle");
572         this.command_queue = new Queue();  // explicitly reclaim storage from old Queue
573     }
574     if (this.should_buffer && queue_len <= this.command_queue_low_water) {
575         this.emit("drain");
576         this.should_buffer = false;
577     }
578
579     if (command_obj && !command_obj.sub_command) {
580         if (typeof command_obj.callback === "function") {
581             if (this.options.detect_buffers && command_obj.buffer_args === false) {
582                 // If detect_buffers option was specified, then the reply from the parser will be Buffers.
583                 // If this command did not use Buffer arguments, then convert the reply to Strings here.
584                 reply = reply_to_strings(reply);
585             }
586
587             // TODO - confusing and error-prone that hgetall is special cased in two places
588             if (reply && 'hgetall' === command_obj.command.toLowerCase()) {
589                 reply = reply_to_object(reply);
590             }
591
592             try_callback(command_obj.callback, reply);
593         } else if (exports.debug_mode) {
594             console.log("no callback for reply: " + (reply && reply.toString && reply.toString()));
595         }
596     } else if (this.pub_sub_mode || (command_obj && command_obj.sub_command)) {
597         if (Array.isArray(reply)) {
598             type = reply[0].toString();
599
600             if (type === "message") {
601                 this.emit("message", reply[1].toString(), reply[2]); // channel, message
602             } else if (type === "pmessage") {
603                 this.emit("pmessage", reply[1].toString(), reply[2].toString(), reply[3]); // pattern, channel, message
604             } else if (type === "subscribe" || type === "unsubscribe" || type === "psubscribe" || type === "punsubscribe") {
605                 if (reply[2] === 0) {
606                     this.pub_sub_mode = false;
607                     if (this.debug_mode) {
608                         console.log("All subscriptions removed, exiting pub/sub mode");
609                     }
610                 } else {
611                     this.pub_sub_mode = true;
612                 }
613                 // subscribe commands take an optional callback and also emit an event, but only the first response is included in the callback
614                 // TODO - document this or fix it so it works in a more obvious way
615                 if (command_obj && typeof command_obj.callback === "function") {
616                     try_callback(command_obj.callback, reply[1].toString());
617                 }
618                 this.emit(type, reply[1].toString(), reply[2]); // channel, count
619             } else {
620                 throw new Error("subscriptions are active but got unknown reply type " + type);
621             }
622         } else if (! this.closing) {
623             throw new Error("subscriptions are active but got an invalid reply: " + reply);
624         }
625     } else if (this.monitoring) {
626         len = reply.indexOf(" ");
627         timestamp = reply.slice(0, len);
628         argindex = reply.indexOf('"');
629         args = reply.slice(argindex + 1, -1).split('" "').map(function (elem) {
630             return elem.replace(/\\"/g, '"');
631         });
632         this.emit("monitor", timestamp, args);
633     } else {
634         throw new Error("node_redis command queue state error. If you can reproduce this, please report it.");
635     }
636 };
637
638 // This Command constructor is ever so slightly faster than using an object literal, but more importantly, using
639 // a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots.
640 function Command(command, args, sub_command, buffer_args, callback) {
641     this.command = command;
642     this.args = args;
643     this.sub_command = sub_command;
644     this.buffer_args = buffer_args;
645     this.callback = callback;
646 }
647
648 RedisClient.prototype.send_command = function (command, args, callback) {
649     var arg, this_args, command_obj, i, il, elem_count, buffer_args, stream = this.stream, command_str = "", buffered_writes = 0, last_arg_type;
650
651     if (typeof command !== "string") {
652         throw new Error("First argument to send_command must be the command name string, not " + typeof command);
653     }
654
655     if (Array.isArray(args)) {
656         if (typeof callback === "function") {
657             // probably the fastest way:
658             //     client.command([arg1, arg2], cb);  (straight passthrough)
659             //         send_command(command, [arg1, arg2], cb);
660         } else if (! callback) {
661             // most people find this variable argument length form more convenient, but it uses arguments, which is slower
662             //     client.command(arg1, arg2, cb);   (wraps up arguments into an array)
663             //       send_command(command, [arg1, arg2, cb]);
664             //     client.command(arg1, arg2);   (callback is optional)
665             //       send_command(command, [arg1, arg2]);
666             //     client.command(arg1, arg2, undefined);   (callback is undefined)
667             //       send_command(command, [arg1, arg2, undefined]);
668             last_arg_type = typeof args[args.length - 1];
669             if (last_arg_type === "function" || last_arg_type === "undefined") {
670                 callback = args.pop();
671             }
672         } else {
673             throw new Error("send_command: last argument must be a callback or undefined");
674         }
675     } else {
676         throw new Error("send_command: second argument must be an array");
677     }
678
679     // if the last argument is an array and command is sadd, expand it out:
680     //     client.sadd(arg1, [arg2, arg3, arg4], cb);
681     //  converts to:
682     //     client.sadd(arg1, arg2, arg3, arg4, cb);
683     if ((command === 'sadd' || command === 'SADD') && args.length > 0 && Array.isArray(args[args.length - 1])) {
684         args = args.slice(0, -1).concat(args[args.length - 1]);
685     }
686
687     buffer_args = false;
688     for (i = 0, il = args.length, arg; i < il; i += 1) {
689         if (Buffer.isBuffer(args[i])) {
690             buffer_args = true;
691         }
692     }
693
694     command_obj = new Command(command, args, false, buffer_args, callback);
695
696     if ((!this.ready && !this.send_anyway) || !stream.writable) {
697         if (exports.debug_mode) {
698             if (!stream.writable) {
699                 console.log("send command: stream is not writeable.");
700             }
701         }
702
703         if (this.enable_offline_queue) {
704             if (exports.debug_mode) {
705                 console.log("Queueing " + command + " for next server connection.");
706             }
707             this.offline_queue.push(command_obj);
708             this.should_buffer = true;
709         } else {
710             var not_writeable_error = new Error('send_command: stream not writeable. enable_offline_queue is false');
711             if (command_obj.callback) {
712                 command_obj.callback(not_writeable_error);
713             } else {
714                 throw not_writeable_error;
715             }
716         }
717
718         return false;
719     }
720
721     if (command === "subscribe" || command === "psubscribe" || command === "unsubscribe" || command === "punsubscribe") {
722         this.pub_sub_command(command_obj);
723     } else if (command === "monitor") {
724         this.monitoring = true;
725     } else if (command === "quit") {
726         this.closing = true;
727     } else if (this.pub_sub_mode === true) {
728         throw new Error("Connection in pub/sub mode, only pub/sub commands may be used");
729     }
730     this.command_queue.push(command_obj);
731     this.commands_sent += 1;
732
733     elem_count = args.length + 1;
734
735     // Always use "Multi bulk commands", but if passed any Buffer args, then do multiple writes, one for each arg.
736     // This means that using Buffers in commands is going to be slower, so use Strings if you don't already have a Buffer.
737
738     command_str = "*" + elem_count + "\r\n$" + command.length + "\r\n" + command + "\r\n";
739
740     if (! buffer_args) { // Build up a string and send entire command in one write
741         for (i = 0, il = args.length, arg; i < il; i += 1) {
742             arg = args[i];
743             if (typeof arg !== "string") {
744                 arg = String(arg);
745             }
746             command_str += "$" + Buffer.byteLength(arg) + "\r\n" + arg + "\r\n";
747         }
748         if (exports.debug_mode) {
749             console.log("send " + this.host + ":" + this.port + " id " + this.connection_id + ": " + command_str);
750         }
751         buffered_writes += !stream.write(command_str);
752     } else {
753         if (exports.debug_mode) {
754             console.log("send command (" + command_str + ") has Buffer arguments");
755         }
756         buffered_writes += !stream.write(command_str);
757
758         for (i = 0, il = args.length, arg; i < il; i += 1) {
759             arg = args[i];
760             if (!(Buffer.isBuffer(arg) || arg instanceof String)) {
761                 arg = String(arg);
762             }
763
764             if (Buffer.isBuffer(arg)) {
765                 if (arg.length === 0) {
766                     if (exports.debug_mode) {
767                         console.log("send_command: using empty string for 0 length buffer");
768                     }
769                     buffered_writes += !stream.write("$0\r\n\r\n");
770                 } else {
771                     buffered_writes += !stream.write("$" + arg.length + "\r\n");
772                     buffered_writes += !stream.write(arg);
773                     buffered_writes += !stream.write("\r\n");
774                     if (exports.debug_mode) {
775                         console.log("send_command: buffer send " + arg.length + " bytes");
776                     }
777                 }
778             } else {
779                 if (exports.debug_mode) {
780                     console.log("send_command: string send " + Buffer.byteLength(arg) + " bytes: " + arg);
781                 }
782                 buffered_writes += !stream.write("$" + Buffer.byteLength(arg) + "\r\n" + arg + "\r\n");
783             }
784         }
785     }
786     if (exports.debug_mode) {
787         console.log("send_command buffered_writes: " + buffered_writes, " should_buffer: " + this.should_buffer);
788     }
789     if (buffered_writes || this.command_queue.getLength() >= this.command_queue_high_water) {
790         this.should_buffer = true;
791     }
792     return !this.should_buffer;
793 };
794
795 RedisClient.prototype.pub_sub_command = function (command_obj) {
796     var i, key, command, args;
797
798     if (this.pub_sub_mode === false && exports.debug_mode) {
799         console.log("Entering pub/sub mode from " + command_obj.command);
800     }
801     this.pub_sub_mode = true;
802     command_obj.sub_command = true;
803
804     command = command_obj.command;
805     args = command_obj.args;
806     if (command === "subscribe" || command === "psubscribe") {
807         if (command === "subscribe") {
808             key = "sub";
809         } else {
810             key = "psub";
811         }
812         for (i = 0; i < args.length; i++) {
813             this.subscription_set[key + " " + args[i]] = true;
814         }
815     } else {
816         if (command === "unsubscribe") {
817             key = "sub";
818         } else {
819             key = "psub";
820         }
821         for (i = 0; i < args.length; i++) {
822             delete this.subscription_set[key + " " + args[i]];
823         }
824     }
825 };
826
827 RedisClient.prototype.end = function () {
828     this.stream._events = {};
829     this.connected = false;
830     this.ready = false;
831     return this.stream.end();
832 };
833
834 function Multi(client, args) {
835     this.client = client;
836     this.queue = [["MULTI"]];
837     if (Array.isArray(args)) {
838         this.queue = this.queue.concat(args);
839     }
840 }
841
842 exports.Multi = Multi;
843
844 // take 2 arrays and return the union of their elements
845 function set_union(seta, setb) {
846     var obj = {};
847
848     seta.forEach(function (val) {
849         obj[val] = true;
850     });
851     setb.forEach(function (val) {
852         obj[val] = true;
853     });
854     return Object.keys(obj);
855 }
856
857 // This static list of commands is updated from time to time.  ./lib/commands.js can be updated with generate_commands.js
858 commands = set_union(["get", "set", "setnx", "setex", "append", "strlen", "del", "exists", "setbit", "getbit", "setrange", "getrange", "substr",
859     "incr", "decr", "mget", "rpush", "lpush", "rpushx", "lpushx", "linsert", "rpop", "lpop", "brpop", "brpoplpush", "blpop", "llen", "lindex",
860     "lset", "lrange", "ltrim", "lrem", "rpoplpush", "sadd", "srem", "smove", "sismember", "scard", "spop", "srandmember", "sinter", "sinterstore",
861     "sunion", "sunionstore", "sdiff", "sdiffstore", "smembers", "zadd", "zincrby", "zrem", "zremrangebyscore", "zremrangebyrank", "zunionstore",
862     "zinterstore", "zrange", "zrangebyscore", "zrevrangebyscore", "zcount", "zrevrange", "zcard", "zscore", "zrank", "zrevrank", "hset", "hsetnx",
863     "hget", "hmset", "hmget", "hincrby", "hdel", "hlen", "hkeys", "hvals", "hgetall", "hexists", "incrby", "decrby", "getset", "mset", "msetnx",
864     "randomkey", "select", "move", "rename", "renamenx", "expire", "expireat", "keys", "dbsize", "auth", "ping", "echo", "save", "bgsave",
865     "bgrewriteaof", "shutdown", "lastsave", "type", "multi", "exec", "discard", "sync", "flushdb", "flushall", "sort", "info", "monitor", "ttl",
866     "persist", "slaveof", "debug", "config", "subscribe", "unsubscribe", "psubscribe", "punsubscribe", "publish", "watch", "unwatch", "cluster",
867     "restore", "migrate", "dump", "object", "client", "eval", "evalsha"], require("./lib/commands"));
868
869 commands.forEach(function (command) {
870     RedisClient.prototype[command] = function (args, callback) {
871         if (Array.isArray(args) && typeof callback === "function") {
872             return this.send_command(command, args, callback);
873         } else {
874             return this.send_command(command, to_array(arguments));
875         }
876     };
877     RedisClient.prototype[command.toUpperCase()] = RedisClient.prototype[command];
878
879     Multi.prototype[command] = function () {
880         this.queue.push([command].concat(to_array(arguments)));
881         return this;
882     };
883     Multi.prototype[command.toUpperCase()] = Multi.prototype[command];
884 });
885
886 // store db in this.select_db to restore it on reconnect
887 RedisClient.prototype.select = function (db, callback) {
888         var self = this;
889
890         this.send_command('select', [db], function (err, res) {
891         if (err === null) {
892             self.selected_db = db;
893         }
894         if (typeof(callback) === 'function') {
895             callback(err, res);
896         }
897     });
898 };
899 RedisClient.prototype.SELECT = RedisClient.prototype.select;
900
901 // Stash auth for connect and reconnect.  Send immediately if already connected.
902 RedisClient.prototype.auth = function () {
903     var args = to_array(arguments);
904     this.auth_pass = args[0];
905     this.auth_callback = args[1];
906     if (exports.debug_mode) {
907         console.log("Saving auth as " + this.auth_pass);
908     }
909
910     if (this.connected) {
911         this.send_command("auth", args);
912     }
913 };
914 RedisClient.prototype.AUTH = RedisClient.prototype.auth;
915
916 RedisClient.prototype.hmget = function (arg1, arg2, arg3) {
917     if (Array.isArray(arg2) && typeof arg3 === "function") {
918         return this.send_command("hmget", [arg1].concat(arg2), arg3);
919     } else if (Array.isArray(arg1) && typeof arg2 === "function") {
920         return this.send_command("hmget", arg1, arg2);
921     } else {
922         return this.send_command("hmget", to_array(arguments));
923     }
924 };
925 RedisClient.prototype.HMGET = RedisClient.prototype.hmget;
926
927 RedisClient.prototype.hmset = function (args, callback) {
928     var tmp_args, tmp_keys, i, il, key;
929
930     if (Array.isArray(args) && typeof callback === "function") {
931         return this.send_command("hmset", args, callback);
932     }
933
934     args = to_array(arguments);
935     if (typeof args[args.length - 1] === "function") {
936         callback = args[args.length - 1];
937         args.length -= 1;
938     } else {
939         callback = null;
940     }
941
942     if (args.length === 2 && typeof args[0] === "string" && typeof args[1] === "object") {
943         // User does: client.hmset(key, {key1: val1, key2: val2})
944         tmp_args = [ args[0] ];
945         tmp_keys = Object.keys(args[1]);
946         for (i = 0, il = tmp_keys.length; i < il ; i++) {
947             key = tmp_keys[i];
948             tmp_args.push(key);
949             if (typeof args[1][key] !== "string") {
950                 var err = new Error("hmset expected value to be a string", key, ":", args[1][key]);
951                 if (callback) return callback(err);
952                 else throw err;
953             }
954             tmp_args.push(args[1][key]);
955         }
956         args = tmp_args;
957     }
958
959     return this.send_command("hmset", args, callback);
960 };
961 RedisClient.prototype.HMSET = RedisClient.prototype.hmset;
962
963 Multi.prototype.hmset = function () {
964     var args = to_array(arguments), tmp_args;
965     if (args.length >= 2 && typeof args[0] === "string" && typeof args[1] === "object") {
966         tmp_args = [ "hmset", args[0] ];
967         Object.keys(args[1]).map(function (key) {
968             tmp_args.push(key);
969             tmp_args.push(args[1][key]);
970         });
971         if (args[2]) {
972             tmp_args.push(args[2]);
973         }
974         args = tmp_args;
975     } else {
976         args.unshift("hmset");
977     }
978
979     this.queue.push(args);
980     return this;
981 };
982 Multi.prototype.HMSET = Multi.prototype.hmset;
983
984 Multi.prototype.exec = function (callback) {
985     var self = this;
986
987     // drain queue, callback will catch "QUEUED" or error
988     // TODO - get rid of all of these anonymous functions which are elegant but slow
989     this.queue.forEach(function (args, index) {
990         var command = args[0], obj;
991         if (typeof args[args.length - 1] === "function") {
992             args = args.slice(1, -1);
993         } else {
994             args = args.slice(1);
995         }
996         if (args.length === 1 && Array.isArray(args[0])) {
997             args = args[0];
998         }
999         if (command.toLowerCase() === 'hmset' && typeof args[1] === 'object') {
1000             obj = args.pop();
1001             Object.keys(obj).forEach(function (key) {
1002                 args.push(key);
1003                 args.push(obj[key]);
1004             });
1005         }
1006         this.client.send_command(command, args, function (err, reply) {
1007             if (err) {
1008                 var cur = self.queue[index];
1009                 if (typeof cur[cur.length - 1] === "function") {
1010                     cur[cur.length - 1](err);
1011                 } else {
1012                     throw new Error(err);
1013                 }
1014                 self.queue.splice(index, 1);
1015             }
1016         });
1017     }, this);
1018
1019     // TODO - make this callback part of Multi.prototype instead of creating it each time
1020     return this.client.send_command("EXEC", [], function (err, replies) {
1021         if (err) {
1022             if (callback) {
1023                 callback(new Error(err));
1024                 return;
1025             } else {
1026                 throw new Error(err);
1027             }
1028         }
1029
1030         var i, il, j, jl, reply, args;
1031
1032         if (replies) {
1033             for (i = 1, il = self.queue.length; i < il; i += 1) {
1034                 reply = replies[i - 1];
1035                 args = self.queue[i];
1036
1037                 // TODO - confusing and error-prone that hgetall is special cased in two places
1038                 if (reply && args[0].toLowerCase() === "hgetall") {
1039                     replies[i - 1] = reply = reply_to_object(reply);
1040                 }
1041
1042                 if (typeof args[args.length - 1] === "function") {
1043                     args[args.length - 1](null, reply);
1044                 }
1045             }
1046         }
1047
1048         if (callback) {
1049             callback(null, replies);
1050         }
1051     });
1052 };
1053 Multi.prototype.EXEC = Multi.prototype.exec;
1054
1055 RedisClient.prototype.multi = function (args) {
1056     return new Multi(this, args);
1057 };
1058 RedisClient.prototype.MULTI = function (args) {
1059     return new Multi(this, args);
1060 };
1061
1062
1063 // stash original eval method
1064 var eval = RedisClient.prototype.eval;
1065 // hook eval with an attempt to evalsha for cached scripts
1066 RedisClient.prototype.eval =
1067 RedisClient.prototype.EVAL = function () {
1068     var self = this,
1069         args = to_array(arguments),
1070         callback;
1071
1072     if (typeof args[args.length - 1] === "function") {
1073         callback = args.pop();
1074     }
1075
1076     // replace script source with sha value
1077     var source = args[0];
1078     args[0] = crypto.createHash("sha1").update(source).digest("hex");
1079
1080     self.evalsha(args, function (err, reply) {
1081         if (err && /NOSCRIPT/.test(err.message)) {
1082             args[0] = source;
1083             eval.call(self, args, callback);
1084
1085         } else if (callback) {
1086             callback(err, reply);
1087         }
1088     });
1089 };
1090
1091
1092 exports.createClient = function (port_arg, host_arg, options) {
1093     var port = port_arg || default_port,
1094         host = host_arg || default_host,
1095         redis_client, net_client;
1096
1097     net_client = net.createConnection(port, host);
1098
1099     redis_client = new RedisClient(net_client, options);
1100
1101     redis_client.port = port;
1102     redis_client.host = host;
1103
1104     return redis_client;
1105 };
1106
1107 exports.print = function (err, reply) {
1108     if (err) {
1109         console.log("Error: " + err);
1110     } else {
1111         console.log("Reply: " + reply);
1112     }
1113 };