2 * ws: a node.js websocket client
3 * Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com>
7 var util = require('util')
8 , events = require('events')
9 , http = require('http')
10 , https = require('https')
11 , crypto = require('crypto')
12 , url = require('url')
13 , stream = require('stream')
14 , Options = require('options')
15 , Sender = require('./Sender')
16 , Receiver = require('./Receiver')
17 , SenderHixie = require('./Sender.hixie')
18 , ReceiverHixie = require('./Receiver.hixie');
24 // Default protocol version
26 var protocolVersion = 13;
30 var closeTimeout = 30000; // Allow 5 seconds to terminate the connection cleanly
33 * WebSocket implementation
36 function WebSocket(address, protocols, options) {
38 if (protocols && !Array.isArray(protocols) && 'object' == typeof protocols) {
39 // accept the "options" Object as the 2nd argument
43 if ('string' == typeof protocols) {
44 protocols = [ protocols ];
46 if (!Array.isArray(protocols)) {
49 // TODO: actually handle the `Sub-Protocols` part of the WebSocket client
52 this.bytesReceived = 0;
53 this.readyState = null;
56 if (Array.isArray(address)) {
57 initAsServerClient.apply(this, address.concat(options));
59 initAsClient.apply(this, [address, protocols, options]);
64 * Inherits from EventEmitter.
67 util.inherits(WebSocket, events.EventEmitter);
73 ["CONNECTING", "OPEN", "CLOSING", "CLOSED"].forEach(function (state, index) {
74 WebSocket.prototype[state] = WebSocket[state] = index;
78 * Gracefully closes the connection, after sending a description message to the server
80 * @param {Object} data to be sent to the server
84 WebSocket.prototype.close = function(code, data) {
85 if (this.readyState == WebSocket.CLOSING || this.readyState == WebSocket.CLOSED) return;
86 if (this.readyState == WebSocket.CONNECTING) {
87 this.readyState = WebSocket.CLOSED;
91 this.readyState = WebSocket.CLOSING;
92 this._closeCode = code;
93 this._closeMessage = data;
94 var mask = !this._isServer;
95 this._sender.close(code, data, mask);
98 this.emit('error', e);
106 * Pause the client stream
111 WebSocket.prototype.pause = function() {
112 if (this.readyState != WebSocket.OPEN) throw new Error('not opened');
113 return this._socket.pause();
119 * @param {Object} data to be sent to the server
120 * @param {Object} Members - mask: boolean, binary: boolean
121 * @param {boolean} dontFailWhenClosed indicates whether or not to throw if the connection isnt open
125 WebSocket.prototype.ping = function(data, options, dontFailWhenClosed) {
126 if (this.readyState != WebSocket.OPEN) {
127 if (dontFailWhenClosed === true) return;
128 throw new Error('not opened');
130 options = options || {};
131 if (typeof options.mask == 'undefined') options.mask = !this._isServer;
132 this._sender.ping(data, options);
138 * @param {Object} data to be sent to the server
139 * @param {Object} Members - mask: boolean, binary: boolean
140 * @param {boolean} dontFailWhenClosed indicates whether or not to throw if the connection isnt open
144 WebSocket.prototype.pong = function(data, options, dontFailWhenClosed) {
145 if (this.readyState != WebSocket.OPEN) {
146 if (dontFailWhenClosed === true) return;
147 throw new Error('not opened');
149 options = options || {};
150 if (typeof options.mask == 'undefined') options.mask = !this._isServer;
151 this._sender.pong(data, options);
155 * Resume the client stream
160 WebSocket.prototype.resume = function() {
161 if (this.readyState != WebSocket.OPEN) throw new Error('not opened');
162 return this._socket.resume();
166 * Sends a piece of data
168 * @param {Object} data to be sent to the server
169 * @param {Object} Members - mask: boolean, binary: boolean
170 * @param {function} Optional callback which is executed after the send completes
174 WebSocket.prototype.send = function(data, options, cb) {
175 if (typeof options == 'function') {
179 if (this.readyState != WebSocket.OPEN) {
180 if (typeof cb == 'function') cb(new Error('not opened'));
181 else throw new Error('not opened');
184 if (!data) data = '';
187 this._queue.push(function() { self.send(data, options, cb); });
190 options = options || {};
192 if (typeof options.binary == 'undefined') {
193 options.binary = (data instanceof ArrayBuffer || data instanceof Buffer ||
194 data instanceof Uint8Array ||
195 data instanceof Uint16Array ||
196 data instanceof Uint32Array ||
197 data instanceof Int8Array ||
198 data instanceof Int16Array ||
199 data instanceof Int32Array ||
200 data instanceof Float32Array ||
201 data instanceof Float64Array);
203 if (typeof options.mask == 'undefined') options.mask = !this._isServer;
204 var readable = typeof stream.Readable == 'function' ? stream.Readable : stream.Stream;
205 if (data instanceof readable) {
208 sendStream(this, data, options, function(error) {
209 process.nextTick(function() { executeQueueSends(self); });
210 if (typeof cb == 'function') cb(error);
213 else this._sender.send(data, options, cb);
217 * Streams data through calls to a user supplied function
219 * @param {Object} Members - mask: boolean, binary: boolean
220 * @param {function} 'function (error, send)' which is executed on successive ticks of which send is 'function (data, final)'.
224 WebSocket.prototype.stream = function(options, cb) {
225 if (typeof options == 'function') {
230 if (typeof cb != 'function') throw new Error('callback must be provided');
231 if (this.readyState != WebSocket.OPEN) {
232 if (typeof cb == 'function') cb(new Error('not opened'));
233 else throw new Error('not opened');
237 this._queue.push(function() { self.stream(options, cb); });
240 options = options || {};
241 if (typeof options.mask == 'undefined') options.mask = !this._isServer;
243 var send = function(data, final) {
245 if (self.readyState != WebSocket.OPEN) throw new Error('not opened');
246 options.fin = final === true;
247 self._sender.send(data, options);
248 if (!final) process.nextTick(cb.bind(null, null, send));
249 else executeQueueSends(self);
252 if (typeof cb == 'function') cb(e);
255 self.emit('error', e);
259 process.nextTick(cb.bind(null, null, send));
263 * Immediately shuts down the connection
268 WebSocket.prototype.terminate = function() {
269 if (this.readyState == WebSocket.CLOSED) return;
272 // End the connection
276 // Socket error during end() call, so just destroy it right now
277 cleanupWebsocketResources.call(this, true);
281 // Add a timeout to ensure that the connection is completely
282 // cleaned up within 30 seconds, even if the clean close procedure
283 // fails for whatever reason
284 this._closeTimer = setTimeout(cleanupWebsocketResources.bind(this, true), closeTimeout);
286 else if (this.readyState == WebSocket.CONNECTING) {
287 cleanupWebsocketResources.call(this, true);
292 * Expose bufferedAmount
297 Object.defineProperty(WebSocket.prototype, 'bufferedAmount', {
298 get: function get() {
301 amount = this._socket.bufferSize || 0;
308 * Emulates the W3C Browser based WebSocket interface using function members.
310 * @see http://dev.w3.org/html5/websockets/#the-websocket-interface
314 ['open', 'error', 'close', 'message'].forEach(function(method) {
315 Object.defineProperty(WebSocket.prototype, 'on' + method, {
317 * Returns the current listener
319 * @returns {Mixed} the set function or undefined
323 get: function get() {
324 var listener = this.listeners(method)[0];
325 return listener ? (listener._listener ? listener._listener : listener) : undefined;
329 * Start listening for events
331 * @param {Function} listener the listener
332 * @returns {Mixed} the set function or undefined
336 set: function set(listener) {
337 this.removeAllListeners(method);
338 this.addEventListener(method, listener);
344 * Emulates the W3C Browser based WebSocket interface using addEventListener.
346 * @see https://developer.mozilla.org/en/DOM/element.addEventListener
347 * @see http://dev.w3.org/html5/websockets/#the-websocket-interface
350 WebSocket.prototype.addEventListener = function(method, listener) {
352 if (typeof listener === 'function') {
353 if (method === 'message') {
354 function onMessage (data, flags) {
355 listener.call(this, new MessageEvent(data, flags.binary ? 'Binary' : 'Text', target));
357 // store a reference so we can return the original function from the addEventListener hook
358 onMessage._listener = listener;
359 this.on(method, onMessage);
360 } else if (method === 'close') {
361 function onClose (code, message) {
362 listener.call(this, new CloseEvent(code, message, target));
364 // store a reference so we can return the original function from the addEventListener hook
365 onClose._listener = listener;
366 this.on(method, onClose);
367 } else if (method === 'error') {
368 function onError (event) {
369 event.target = target;
370 listener.call(this, event);
372 // store a reference so we can return the original function from the addEventListener hook
373 onError._listener = listener;
374 this.on(method, onError);
375 } else if (method === 'open') {
377 listener.call(this, new OpenEvent(target));
379 // store a reference so we can return the original function from the addEventListener hook
380 onOpen._listener = listener;
381 this.on(method, onOpen);
383 this.on(method, listener);
388 module.exports = WebSocket;
393 * @see http://www.w3.org/TR/html5/comms.html
397 function MessageEvent(dataArg, typeArg, target) {
400 this.target = target;
406 * @see http://www.w3.org/TR/html5/comms.html
410 function CloseEvent(code, reason, target) {
411 this.wasClean = (typeof code == 'undefined' || code == 1000);
413 this.reason = reason;
414 this.target = target;
420 * @see http://www.w3.org/TR/html5/comms.html
424 function OpenEvent(target) {
425 this.target = target;
429 * Entirely private apis,
430 * which may or may not be bound to a sepcific WebSocket instance.
433 function initAsServerClient(req, socket, upgradeHead, options) {
434 options = new Options({
435 protocolVersion: protocolVersion,
439 // expose state properties
440 this.protocol = options.value.protocol;
441 this.protocolVersion = options.value.protocolVersion;
442 this.supports.binary = (this.protocolVersion != 'hixie-76');
443 this.upgradeReq = req;
444 this.readyState = WebSocket.CONNECTING;
445 this._isServer = true;
447 // establish connection
448 if (options.value.protocolVersion == 'hixie-76') establishConnection.call(this, ReceiverHixie, SenderHixie, socket, upgradeHead);
449 else establishConnection.call(this, Receiver, Sender, socket, upgradeHead);
452 function initAsClient(address, protocols, options) {
453 options = new Options({
455 protocolVersion: protocolVersion,
461 // ssl-related options
468 rejectUnauthorized: null
470 if (options.value.protocolVersion != 8 && options.value.protocolVersion != 13) {
471 throw new Error('unsupported protocol version');
474 // verify url and establish http class
475 var serverUrl = url.parse(address);
476 var isUnixSocket = serverUrl.protocol === 'ws+unix:';
477 if (!serverUrl.host && !isUnixSocket) throw new Error('invalid url');
478 var isSecure = serverUrl.protocol === 'wss:' || serverUrl.protocol === 'https:';
479 var httpObj = isSecure ? https : http;
480 var port = serverUrl.port || (isSecure ? 443 : 80);
481 var auth = serverUrl.auth;
483 // expose state properties
484 this._isServer = false;
486 this.protocolVersion = options.value.protocolVersion;
487 this.supports.binary = (this.protocolVersion != 'hixie-76');
490 var key = new Buffer(options.value.protocolVersion + '-' + Date.now()).toString('base64');
491 var shasum = crypto.createHash('sha1');
492 shasum.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11');
493 var expectedServerKey = shasum.digest('base64');
495 var agent = options.value.agent;
497 var headerHost = serverUrl.hostname;
498 // Append port number to Host and Origin header, only if specified in the url and non-default
500 if((isSecure && (port != 443)) || (!isSecure && (port != 80))){
501 headerHost = headerHost + ':' + port;
505 var requestOptions = {
507 host: serverUrl.hostname,
509 'Connection': 'Upgrade',
510 'Upgrade': 'websocket',
512 'Origin': headerHost,
513 'Sec-WebSocket-Version': options.value.protocolVersion,
514 'Sec-WebSocket-Key': key
518 // If we have basic auth.
520 requestOptions.headers['Authorization'] = 'Basic ' + new Buffer(auth).toString('base64');
523 if (options.value.protocol) {
524 requestOptions.headers['Sec-WebSocket-Protocol'] = options.value.protocol;
527 if (options.value.host) {
528 requestOptions.headers['Host'] = options.value.host;
531 if (options.value.headers) {
532 for (var header in options.value.headers) {
533 if (options.value.headers.hasOwnProperty(header)) {
534 requestOptions.headers[header] = options.value.headers[header];
539 if (options.isDefinedAndNonNull('pfx')
540 || options.isDefinedAndNonNull('key')
541 || options.isDefinedAndNonNull('passphrase')
542 || options.isDefinedAndNonNull('cert')
543 || options.isDefinedAndNonNull('ca')
544 || options.isDefinedAndNonNull('ciphers')
545 || options.isDefinedAndNonNull('rejectUnauthorized')) {
547 if (options.isDefinedAndNonNull('pfx')) requestOptions.pfx = options.value.pfx;
548 if (options.isDefinedAndNonNull('key')) requestOptions.key = options.value.key;
549 if (options.isDefinedAndNonNull('passphrase')) requestOptions.passphrase = options.value.passphrase;
550 if (options.isDefinedAndNonNull('cert')) requestOptions.cert = options.value.cert;
551 if (options.isDefinedAndNonNull('ca')) requestOptions.ca = options.value.ca;
552 if (options.isDefinedAndNonNull('ciphers')) requestOptions.ciphers = options.value.ciphers;
553 if (options.isDefinedAndNonNull('rejectUnauthorized')) requestOptions.rejectUnauthorized = options.value.rejectUnauthorized;
556 // global agent ignores client side certificates
557 agent = new httpObj.Agent(requestOptions);
561 requestOptions.path = serverUrl.path || '/';
564 requestOptions.agent = agent;
568 requestOptions.socketPath = serverUrl.pathname;
570 if (options.value.origin) {
571 if (options.value.protocolVersion < 13) requestOptions.headers['Sec-WebSocket-Origin'] = options.value.origin;
572 else requestOptions.headers['Origin'] = options.value.origin;
576 var req = httpObj.request(requestOptions);
578 req.on('error', function(error) {
579 self.emit('error', error);
580 cleanupWebsocketResources.call(this, error);
583 req.once('response', function(res) {
584 if (!self.emit('unexpected-response', req, res)) {
585 var error = new Error('unexpected server response (' + res.statusCode + ')');
587 self.emit('error', error);
589 cleanupWebsocketResources.call(this, error);
592 req.once('upgrade', function(res, socket, upgradeHead) {
593 if (self.readyState == WebSocket.CLOSED) {
594 // client closed before server accepted connection
596 self.removeAllListeners();
600 var serverKey = res.headers['sec-websocket-accept'];
601 if (typeof serverKey == 'undefined' || serverKey !== expectedServerKey) {
602 self.emit('error', 'invalid server key');
603 self.removeAllListeners();
608 var serverProt = res.headers['sec-websocket-protocol'];
609 var protList = (options.value.protocol || "").split(/, */);
610 var protError = null;
611 if (!options.value.protocol && serverProt) {
612 protError = 'server sent a subprotocol even though none requested';
613 } else if (options.value.protocol && !serverProt) {
614 protError = 'server sent no subprotocol even though requested';
615 } else if (serverProt && protList.indexOf(serverProt) === -1) {
616 protError = 'server responded with an invalid protocol';
619 self.emit('error', protError);
620 self.removeAllListeners();
623 } else if (serverProt) {
624 self.protocol = serverProt;
627 establishConnection.call(self, Receiver, Sender, socket, upgradeHead);
629 // perform cleanup on http resources
630 req.removeAllListeners();
636 this.readyState = WebSocket.CONNECTING;
639 function establishConnection(ReceiverClass, SenderClass, socket, upgradeHead) {
640 this._socket = socket;
641 socket.setTimeout(0);
642 socket.setNoDelay(true);
644 this._receiver = new ReceiverClass();
646 // socket cleanup handlers
647 socket.on('end', cleanupWebsocketResources.bind(this));
648 socket.on('close', cleanupWebsocketResources.bind(this));
649 socket.on('error', cleanupWebsocketResources.bind(this));
651 // ensure that the upgradeHead is added to the receiver
652 function firstHandler(data) {
653 if (self.readyState != WebSocket.OPEN) return;
654 if (upgradeHead && upgradeHead.length > 0) {
655 self.bytesReceived += upgradeHead.length;
656 var head = upgradeHead;
658 self._receiver.add(head);
660 dataHandler = realHandler;
662 self.bytesReceived += data.length;
663 self._receiver.add(data);
666 // subsequent packets are pushed straight to the receiver
667 function realHandler(data) {
668 if (data) self.bytesReceived += data.length;
669 self._receiver.add(data);
671 var dataHandler = firstHandler;
672 // if data was passed along with the http upgrade,
673 // this will schedule a push of that on to the receiver.
674 // this has to be done on next tick, since the caller
675 // hasn't had a chance to set event handlers on this client
677 process.nextTick(firstHandler);
679 // receiver event handlers
680 self._receiver.ontext = function (data, flags) {
682 self.emit('message', data, flags);
684 self._receiver.onbinary = function (data, flags) {
687 self.emit('message', data, flags);
689 self._receiver.onping = function(data, flags) {
691 self.pong(data, {mask: !self._isServer, binary: flags.binary === true}, true);
692 self.emit('ping', data, flags);
694 self._receiver.onpong = function(data, flags) {
695 self.emit('pong', data, flags);
697 self._receiver.onclose = function(code, data, flags) {
699 self.close(code, data);
701 self._receiver.onerror = function(reason, errorCode) {
702 // close the connection when the receiver reports a HyBi error code
703 self.close(typeof errorCode != 'undefined' ? errorCode : 1002, '');
704 self.emit('error', reason, errorCode);
707 // finalize the client
708 this._sender = new SenderClass(socket);
709 this._sender.on('error', function(error) {
710 self.close(1002, '');
711 self.emit('error', error);
713 this.readyState = WebSocket.OPEN;
716 socket.on('data', dataHandler);
719 function startQueue(instance) {
720 instance._queue = instance._queue || [];
723 function executeQueueSends(instance) {
724 var queue = instance._queue;
725 if (typeof queue == 'undefined') return;
726 delete instance._queue;
727 for (var i = 0, l = queue.length; i < l; ++i) {
732 function sendStream(instance, stream, options, cb) {
733 stream.on('data', function(data) {
734 if (instance.readyState != WebSocket.OPEN) {
735 if (typeof cb == 'function') cb(new Error('not opened'));
737 delete instance._queue;
738 instance.emit('error', new Error('not opened'));
743 instance._sender.send(data, options);
745 stream.on('end', function() {
746 if (instance.readyState != WebSocket.OPEN) {
747 if (typeof cb == 'function') cb(new Error('not opened'));
749 delete instance._queue;
750 instance.emit('error', new Error('not opened'));
755 instance._sender.send(null, options);
756 if (typeof cb == 'function') cb(null);
760 function cleanupWebsocketResources(error) {
761 if (this.readyState == WebSocket.CLOSED) return;
762 var emitClose = this.readyState != WebSocket.CONNECTING;
763 this.readyState = WebSocket.CLOSED;
765 clearTimeout(this._closeTimer);
766 this._closeTimer = null;
767 if (emitClose) this.emit('close', this._closeCode || 1000, this._closeMessage || '');
770 this._socket.removeAllListeners();
771 // catch all socket error after removing all standard handlers
772 var socket = this._socket;
773 this._socket.on('error', function() {
774 try { socket.destroy(); } catch (e) {}
777 if (!error) this._socket.end();
778 else this._socket.destroy();
780 catch (e) { /* Ignore termination errors */ }
784 this._sender.removeAllListeners();
787 if (this._receiver) {
788 this._receiver.cleanup();
789 this._receiver = null;
791 this.removeAllListeners();
792 this.on('error', function() {}); // catch all errors after this