5 var util = require('./util')
6 , transports = require('./transports')
7 , Emitter = require('./emitter')
8 , debug = require('debug')('engine-client:socket');
14 module.exports = Socket;
20 var global = 'undefined' != typeof window ? window : global;
25 * @param {Object} options
29 function Socket(opts){
30 if (!(this instanceof Socket)) return new Socket(opts);
32 if ('string' == typeof opts) {
33 var uri = util.parseUri(opts);
34 opts = arguments[1] || {};
36 opts.secure = uri.protocol == 'https' || uri.protocol == 'wss';
41 this.secure = null != opts.secure ? opts.secure : (global.location && 'https:' == location.protocol);
42 this.host = opts.host || opts.hostname || (global.location ? location.hostname : 'localhost');
43 this.port = opts.port || (global.location && location.port ? location.port : (this.secure ? 443 : 80));
44 this.query = opts.query || {};
45 this.query.uid = rnd();
46 this.upgrade = false !== opts.upgrade;
47 this.resource = opts.resource || 'default';
48 this.path = (opts.path || '/engine.io').replace(/\/$/, '');
49 this.path += '/' + this.resource + '/';
50 this.forceJSONP = !!opts.forceJSONP;
51 this.timestampParam = opts.timestampParam || 't';
52 this.timestampRequests = !!opts.timestampRequests;
53 this.flashPath = opts.flashPath || '';
54 this.transports = opts.transports || ['polling', 'websocket', 'flashsocket'];
56 this.writeBuffer = [];
57 this.policyPort = opts.policyPort || 843;
60 Socket.sockets.push(this);
61 Socket.sockets.evs.emit('add', this);
68 Emitter(Socket.prototype);
79 * Static EventEmitter.
83 Socket.sockets.evs = new Emitter;
86 * Expose deps for legacy compatibility
87 * and standalone browser access.
90 Socket.Socket = Socket;
91 Socket.Transport = require('./transport');
92 Socket.Emitter = require('./emitter');
93 Socket.transports = require('./transports');
94 Socket.util = require('./util');
95 Socket.parser = require('./parser');
98 * Creates transport of the given type.
100 * @param {String} transport name
101 * @return {Transport}
105 Socket.prototype.createTransport = function (name) {
106 debug('creating transport "%s"', name);
107 var query = clone(this.query);
108 query.transport = name;
114 var transport = new transports[name]({
117 , secure: this.secure
120 , forceJSONP: this.forceJSONP
121 , timestampRequests: this.timestampRequests
122 , timestampParam: this.timestampParam
123 , flashPath: this.flashPath
124 , policyPort: this.policyPort
130 function clone (obj) {
133 if (obj.hasOwnProperty(i)) {
141 * Initializes transport to use and starts probe.
146 Socket.prototype.open = function () {
147 this.readyState = 'opening';
148 var transport = this.createTransport(this.transports[0]);
150 this.setTransport(transport);
154 * Sets the current transport. Disables the existing one (if any).
159 Socket.prototype.setTransport = function (transport) {
162 if (this.transport) {
163 debug('clearing existing transport');
164 this.transport.removeAllListeners();
168 this.transport = transport;
170 // set up transport listeners
172 .on('drain', function () {
175 .on('packet', function (packet) {
176 self.onPacket(packet);
178 .on('error', function (e) {
181 .on('close', function () {
182 self.onClose('transport close');
187 * Probes a transport.
189 * @param {String} transport name
193 Socket.prototype.probe = function (name) {
194 debug('probing transport "%s"', name);
195 var transport = this.createTransport(name, { probe: 1 })
199 transport.once('open', function () {
202 debug('probe transport "%s" opened', name);
203 transport.send([{ type: 'ping', data: 'probe' }]);
204 transport.once('packet', function (msg) {
206 if ('pong' == msg.type && 'probe' == msg.data) {
207 debug('probe transport "%s" pong', name);
208 self.upgrading = true;
209 self.emit('upgrading', transport);
211 debug('pausing current transport "%s"', self.transport.name);
212 self.transport.pause(function () {
214 if ('closed' == self.readyState || 'closing' == self.readyState) {
217 debug('changing transport and sending upgrade packet');
218 transport.removeListener('error', onerror);
219 self.emit('upgrade', transport);
220 self.setTransport(transport);
221 transport.send([{ type: 'upgrade' }]);
223 self.upgrading = false;
227 debug('probe transport "%s" failed', name);
228 var err = new Error('probe error');
229 err.transport = transport.name;
230 self.emit('error', err);
235 transport.once('error', onerror);
236 function onerror(err) {
239 // Any callback called by transport should be ignored since now
242 var error = new Error('probe error: ' + err);
243 error.transport = transport.name;
248 debug('probe transport "%s" failed because of error: %s', name, err);
250 self.emit('error', error);
255 this.once('close', function () {
257 debug('socket closed prematurely - aborting probe');
264 this.once('upgrading', function (to) {
265 if (transport && to.name != transport.name) {
266 debug('"%s" works - aborting "%s"', to.name, transport.name);
274 * Called when connection is deemed open.
279 Socket.prototype.onOpen = function () {
280 debug('socket open');
281 this.readyState = 'open';
283 this.onopen && this.onopen.call(this);
286 // we check for `readyState` in case an `open`
287 // listener alreay closed the socket
288 if ('open' == this.readyState && this.upgrade && this.transport.pause) {
289 debug('starting upgrade probes');
290 for (var i = 0, l = this.upgrades.length; i < l; i++) {
291 this.probe(this.upgrades[i]);
302 Socket.prototype.onPacket = function (packet) {
303 if ('opening' == this.readyState || 'open' == this.readyState) {
304 debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
306 this.emit('packet', packet);
308 // Socket is live - any packet counts
309 this.emit('heartbeat');
311 switch (packet.type) {
313 this.onHandshake(util.parseJSON(packet.data));
321 var err = new Error('server error');
322 err.code = packet.data;
323 this.emit('error', err);
327 this.emit('message', packet.data);
328 var event = { data: packet.data };
329 event.toString = function () {
332 this.onmessage && this.onmessage.call(this, event);
336 debug('packet received with socket readyState "%s"', this.readyState);
341 * Called upon handshake completion.
343 * @param {Object} handshake obj
347 Socket.prototype.onHandshake = function (data) {
348 this.emit('handshake', data);
350 this.transport.query.sid = data.sid;
351 this.upgrades = data.upgrades;
352 this.pingInterval = data.pingInterval;
353 this.pingTimeout = data.pingTimeout;
357 // Prolong liveness of socket on heartbeat
358 this.removeListener('heartbeat', this.onHeartbeat);
359 this.on('heartbeat', this.onHeartbeat);
363 * Resets ping timeout.
368 Socket.prototype.onHeartbeat = function (timeout) {
369 clearTimeout(this.pingTimeoutTimer);
371 self.pingTimeoutTimer = setTimeout(function () {
372 if ('closed' == self.readyState) return;
373 self.onClose('ping timeout');
374 }, timeout || (self.pingInterval + self.pingTimeout));
378 * Pings server every `this.pingInterval` and expects response
379 * within `this.pingTimeout` or closes connection.
384 Socket.prototype.ping = function () {
386 clearTimeout(self.pingIntervalTimer);
387 self.pingIntervalTimer = setTimeout(function () {
388 debug('writing ping packet - expecting pong within %sms', self.pingTimeout);
389 self.sendPacket('ping');
390 self.onHeartbeat(self.pingTimeout);
391 }, self.pingInterval);
395 * Flush write buffers.
400 Socket.prototype.flush = function () {
401 if ('closed' != this.readyState && this.transport.writable &&
402 !this.upgrading && this.writeBuffer.length) {
403 debug('flushing %d packets in socket', this.writeBuffer.length);
404 this.transport.send(this.writeBuffer);
405 this.writeBuffer = [];
412 * @param {String} message.
413 * @return {Socket} for chaining.
417 Socket.prototype.write =
418 Socket.prototype.send = function (msg) {
419 this.sendPacket('message', msg);
426 * @param {String} packet type.
427 * @param {String} data.
431 Socket.prototype.sendPacket = function (type, data) {
432 var packet = { type: type, data: data };
433 this.emit('packetCreate', packet);
434 this.writeBuffer.push(packet);
439 * Closes the connection.
444 Socket.prototype.close = function () {
445 if ('opening' == this.readyState || 'open' == this.readyState) {
446 this.onClose('forced close');
447 debug('socket closing - telling transport to close');
448 this.transport.close();
449 this.transport.removeAllListeners();
456 * Called upon transport error
461 Socket.prototype.onError = function (err) {
462 this.emit('error', err);
463 this.onClose('transport error', err);
467 * Called upon transport close.
472 Socket.prototype.onClose = function (reason, desc) {
473 if ('closed' != this.readyState) {
474 debug('socket close with reason: "%s"', reason);
475 clearTimeout(this.pingIntervalTimer);
476 clearTimeout(this.pingTimeoutTimer);
477 this.readyState = 'closed';
478 this.emit('close', reason, desc);
479 this.onclose && this.onclose.call(this);
485 * Generates a random uid.
491 return String(Math.random()).substr(5) + String(Math.random()).substr(5);