4 * Copyright(c) 2011 LearnBoost <dev@learnboost.com>
12 var parser = require('./parser');
15 * Expose the constructor.
18 exports = module.exports = Transport;
21 * Transport constructor.
26 function Transport (mng, data, req) {
29 this.disconnected = false;
31 this.handleRequest(req);
40 Transport.prototype.__defineGetter__('log', function () {
41 return this.manager.log;
50 Transport.prototype.__defineGetter__('store', function () {
51 return this.manager.store;
55 * Handles a request when it's set.
60 Transport.prototype.handleRequest = function (req) {
61 this.log.debug('setting request', req.method, req.url);
64 if (req.method == 'GET') {
65 this.socket = req.socket;
68 this.setHeartbeatInterval();
71 this.onSocketConnect();
76 * Called when a connection is first set.
81 Transport.prototype.onSocketConnect = function () { };
84 * Sets transport handlers
89 Transport.prototype.setHandlers = function () {
92 // we need to do this in a pub/sub way since the client can POST the message
93 // over a different socket (ie: different Transport instance)
94 this.store.subscribe('heartbeat-clear:' + this.id, function () {
95 self.onHeartbeatClear();
98 this.store.subscribe('disconnect-force:' + this.id, function () {
99 self.onForcedDisconnect();
102 this.store.subscribe('dispatch:' + this.id, function (packet, volatile) {
103 self.onDispatch(packet, volatile);
107 end: this.onSocketEnd.bind(this)
108 , close: this.onSocketClose.bind(this)
109 , error: this.onSocketError.bind(this)
110 , drain: this.onSocketDrain.bind(this)
113 this.socket.on('end', this.bound.end);
114 this.socket.on('close', this.bound.close);
115 this.socket.on('error', this.bound.error);
116 this.socket.on('drain', this.bound.drain);
118 this.handlersSet = true;
122 * Removes transport handlers
127 Transport.prototype.clearHandlers = function () {
128 if (this.handlersSet) {
129 this.store.unsubscribe('disconnect-force:' + this.id);
130 this.store.unsubscribe('heartbeat-clear:' + this.id);
131 this.store.unsubscribe('dispatch:' + this.id);
133 this.socket.removeListener('end', this.bound.end);
134 this.socket.removeListener('close', this.bound.close);
135 this.socket.removeListener('error', this.bound.error);
136 this.socket.removeListener('drain', this.bound.drain);
141 * Called when the connection dies
146 Transport.prototype.onSocketEnd = function () {
147 this.end('socket end');
151 * Called when the connection dies
156 Transport.prototype.onSocketClose = function (error) {
157 this.end(error ? 'socket error' : 'socket close');
161 * Called when the connection has an error.
166 Transport.prototype.onSocketError = function (err) {
168 this.socket.destroy();
172 this.log.info('socket error ' + err.stack);
176 * Called when the connection is drained.
181 Transport.prototype.onSocketDrain = function () {
186 * Called upon receiving a heartbeat packet.
191 Transport.prototype.onHeartbeatClear = function () {
192 this.clearHeartbeatTimeout();
193 this.setHeartbeatInterval();
197 * Called upon a forced disconnection.
202 Transport.prototype.onForcedDisconnect = function () {
203 if (!this.disconnected) {
204 this.log.info('transport end by forced client disconnection');
206 this.packet({ type: 'disconnect' });
213 * Dispatches a packet.
218 Transport.prototype.onDispatch = function (packet, volatile) {
220 this.writeVolatile(packet);
227 * Sets the close timeout.
230 Transport.prototype.setCloseTimeout = function () {
231 if (!this.closeTimeout) {
234 this.closeTimeout = setTimeout(function () {
235 self.log.debug('fired close timeout for client', self.id);
236 self.closeTimeout = null;
237 self.end('close timeout');
238 }, this.manager.get('close timeout') * 1000);
240 this.log.debug('set close timeout for client', this.id);
245 * Clears the close timeout.
248 Transport.prototype.clearCloseTimeout = function () {
249 if (this.closeTimeout) {
250 clearTimeout(this.closeTimeout);
251 this.closeTimeout = null;
253 this.log.debug('cleared close timeout for client', this.id);
258 * Sets the heartbeat timeout
261 Transport.prototype.setHeartbeatTimeout = function () {
262 if (!this.heartbeatTimeout && this.manager.enabled('heartbeats')) {
265 this.heartbeatTimeout = setTimeout(function () {
266 self.log.debug('fired heartbeat timeout for client', self.id);
267 self.heartbeatTimeout = null;
268 self.end('heartbeat timeout');
269 }, this.manager.get('heartbeat timeout') * 1000);
271 this.log.debug('set heartbeat timeout for client', this.id);
276 * Clears the heartbeat timeout
281 Transport.prototype.clearHeartbeatTimeout = function () {
282 if (this.heartbeatTimeout && this.manager.enabled('heartbeats')) {
283 clearTimeout(this.heartbeatTimeout);
284 this.heartbeatTimeout = null;
285 this.log.debug('cleared heartbeat timeout for client', this.id);
290 * Sets the heartbeat interval. To be called when a connection opens and when
291 * a heartbeat is received.
296 Transport.prototype.setHeartbeatInterval = function () {
297 if (!this.heartbeatInterval && this.manager.enabled('heartbeats')) {
300 this.heartbeatInterval = setTimeout(function () {
302 self.heartbeatInterval = null;
303 }, this.manager.get('heartbeat interval') * 1000);
305 this.log.debug('set heartbeat interval for client', this.id);
310 * Clears all timeouts.
315 Transport.prototype.clearTimeouts = function () {
316 this.clearCloseTimeout();
317 this.clearHeartbeatTimeout();
318 this.clearHeartbeatInterval();
327 Transport.prototype.heartbeat = function () {
329 this.log.debug('emitting heartbeat for client', this.id);
330 this.packet({ type: 'heartbeat' });
331 this.setHeartbeatTimeout();
340 * @param {Object} packet object
344 Transport.prototype.onMessage = function (packet) {
345 var current = this.manager.transports[this.id];
347 if ('heartbeat' == packet.type) {
348 this.log.debug('got heartbeat packet');
350 if (current && current.open) {
351 current.onHeartbeatClear();
353 this.store.publish('heartbeat-clear:' + this.id);
356 if ('disconnect' == packet.type && packet.endpoint == '') {
357 this.log.debug('got disconnection packet');
360 current.onForcedDisconnect();
362 this.store.publish('disconnect-force:' + this.id);
368 if (packet.id && packet.ack != 'data') {
369 this.log.debug('acknowledging packet automatically');
371 var ack = parser.encodePacket({
374 , endpoint: packet.endpoint || ''
377 if (current && current.open) {
378 current.onDispatch(ack);
380 this.manager.onClientDispatch(this.id, ack);
381 this.store.publish('dispatch:' + this.id, ack);
385 // handle packet locally or publish it
387 this.manager.onClientMessage(this.id, packet);
389 this.store.publish('message:' + this.id, packet);
395 * Clears the heartbeat interval
400 Transport.prototype.clearHeartbeatInterval = function () {
401 if (this.heartbeatInterval && this.manager.enabled('heartbeats')) {
402 clearTimeout(this.heartbeatInterval);
403 this.heartbeatInterval = null;
404 this.log.debug('cleared heartbeat interval for client', this.id);
409 * Finishes the connection and makes sure client doesn't reopen
414 Transport.prototype.disconnect = function (reason) {
415 this.packet({ type: 'disconnect' });
422 * Closes the connection.
427 Transport.prototype.close = function () {
435 * Called upon a connection close.
440 Transport.prototype.onClose = function () {
442 this.setCloseTimeout();
443 this.clearHandlers();
445 this.manager.onClose(this.id);
446 this.store.publish('close', this.id);
451 * Cleans up the connection, considers the client disconnected.
456 Transport.prototype.end = function (reason) {
457 if (!this.disconnected) {
458 this.log.info('transport end (' + reason + ')');
460 var local = this.manager.transports[this.id];
463 this.clearTimeouts();
464 this.disconnected = true;
467 this.manager.onClientDisconnect(this.id, reason, true);
469 this.store.publish('disconnect:' + this.id, reason);
475 * Signals that the transport should pause and buffer data.
480 Transport.prototype.discard = function () {
481 this.log.debug('discarding transport');
482 this.discarded = true;
483 this.clearTimeouts();
484 this.clearHandlers();
490 * Writes an error packet with the specified reason and advice.
492 * @param {Number} advice
493 * @param {Number} reason
497 Transport.prototype.error = function (reason, advice) {
504 this.log.warn(reason, advice ? ('client should ' + advice) : '');
514 Transport.prototype.packet = function (obj) {
515 return this.write(parser.encodePacket(obj));
519 * Writes a volatile message.
524 Transport.prototype.writeVolatile = function (msg) {
529 this.log.debug('ignoring volatile packet, buffer not drained');
532 this.log.debug('ignoring volatile packet, transport not open');