2 * ws: a node.js websocket client
3 * Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com>
7 var util = require('util')
8 , Validation = require('./Validation').Validation
9 , ErrorCodes = require('./ErrorCodes')
10 , BufferPool = require('./BufferPool')
11 , bufferUtil = require('./BufferUtil').BufferUtil;
14 * HyBi Receiver implementation
17 function Receiver () {
18 // memory pool for fragmented messages
19 var fragmentedPoolPrevUsed = -1;
20 this.fragmentedBufferPool = new BufferPool(1024, function(db, length) {
21 return db.used + length;
23 return fragmentedPoolPrevUsed = fragmentedPoolPrevUsed >= 0 ?
24 (fragmentedPoolPrevUsed + db.used) / 2 :
28 // memory pool for unfragmented messages
29 var unfragmentedPoolPrevUsed = -1;
30 this.unfragmentedBufferPool = new BufferPool(1024, function(db, length) {
31 return db.used + length;
33 return unfragmentedPoolPrevUsed = unfragmentedPoolPrevUsed >= 0 ?
34 (unfragmentedPoolPrevUsed + db.used) / 2 :
39 activeFragmentedOperation: null,
43 fragmentedOperation: false
46 this.headerBuffer = new Buffer(10);
47 this.expectOffset = 0;
48 this.expectBuffer = null;
49 this.expectHandler = null;
50 this.currentMessage = [];
51 this.expectHeader(2, this.processPacket);
54 this.onerror = function() {};
55 this.ontext = function() {};
56 this.onbinary = function() {};
57 this.onclose = function() {};
58 this.onping = function() {};
59 this.onpong = function() {};
62 module.exports = Receiver;
65 * Add new data to the parser.
70 Receiver.prototype.add = function(data) {
71 var dataLength = data.length;
72 if (dataLength == 0) return;
73 if (this.expectBuffer == null) {
74 this.overflow.push(data);
77 var toRead = Math.min(dataLength, this.expectBuffer.length - this.expectOffset);
78 fastCopy(toRead, data, this.expectBuffer, this.expectOffset);
79 this.expectOffset += toRead;
80 if (toRead < dataLength) {
81 this.overflow.push(data.slice(toRead));
83 while (this.expectBuffer && this.expectOffset == this.expectBuffer.length) {
84 var bufferForHandler = this.expectBuffer;
85 this.expectBuffer = null;
86 this.expectOffset = 0;
87 this.expectHandler.call(this, bufferForHandler);
92 * Releases all resources used by the receiver.
97 Receiver.prototype.cleanup = function() {
100 this.headerBuffer = null;
101 this.expectBuffer = null;
102 this.expectHandler = null;
103 this.unfragmentedBufferPool = null;
104 this.fragmentedBufferPool = null;
106 this.currentMessage = null;
109 this.onbinary = null;
116 * Waits for a certain amount of header bytes to be available, then fires a callback.
121 Receiver.prototype.expectHeader = function(length, handler) {
126 this.expectBuffer = this.headerBuffer.slice(this.expectOffset, this.expectOffset + length);
127 this.expectHandler = handler;
129 while (toRead > 0 && this.overflow.length > 0) {
130 var fromOverflow = this.overflow.pop();
131 if (toRead < fromOverflow.length) this.overflow.push(fromOverflow.slice(toRead));
132 var read = Math.min(fromOverflow.length, toRead);
133 fastCopy(read, fromOverflow, this.expectBuffer, this.expectOffset);
134 this.expectOffset += read;
140 * Waits for a certain amount of data bytes to be available, then fires a callback.
145 Receiver.prototype.expectData = function(length, handler) {
150 this.expectBuffer = this.allocateFromPool(length, this.state.fragmentedOperation);
151 this.expectHandler = handler;
153 while (toRead > 0 && this.overflow.length > 0) {
154 var fromOverflow = this.overflow.pop();
155 if (toRead < fromOverflow.length) this.overflow.push(fromOverflow.slice(toRead));
156 var read = Math.min(fromOverflow.length, toRead);
157 fastCopy(read, fromOverflow, this.expectBuffer, this.expectOffset);
158 this.expectOffset += read;
164 * Allocates memory from the buffer pool.
169 Receiver.prototype.allocateFromPool = function(length, isFragmented) {
170 return (isFragmented ? this.fragmentedBufferPool : this.unfragmentedBufferPool).get(length);
174 * Start processing a new packet.
179 Receiver.prototype.processPacket = function (data) {
180 if ((data[0] & 0x70) != 0) {
181 this.error('reserved fields must be empty', 1002);
184 this.state.lastFragment = (data[0] & 0x80) == 0x80;
185 this.state.masked = (data[1] & 0x80) == 0x80;
186 var opcode = data[0] & 0xf;
188 // continuation frame
189 this.state.fragmentedOperation = true;
190 this.state.opcode = this.state.activeFragmentedOperation;
191 if (!(this.state.opcode == 1 || this.state.opcode == 2)) {
192 this.error('continuation frame cannot follow current opcode', 1002);
197 if (opcode < 3 && this.state.activeFragmentedOperation != null) {
198 this.error('data frames after the initial data frame must have opcode 0', 1002);
201 this.state.opcode = opcode;
202 if (this.state.lastFragment === false) {
203 this.state.fragmentedOperation = true;
204 this.state.activeFragmentedOperation = opcode;
206 else this.state.fragmentedOperation = false;
208 var handler = opcodes[this.state.opcode];
209 if (typeof handler == 'undefined') this.error('no handler for opcode ' + this.state.opcode, 1002);
211 handler.start.call(this, data);
216 * Endprocessing a packet.
221 Receiver.prototype.endPacket = function() {
222 if (!this.state.fragmentedOperation) this.unfragmentedBufferPool.reset(true);
223 else if (this.state.lastFragment) this.fragmentedBufferPool.reset(false);
224 this.expectOffset = 0;
225 this.expectBuffer = null;
226 this.expectHandler = null;
227 if (this.state.lastFragment && this.state.opcode === this.state.activeFragmentedOperation) {
228 // end current fragmented operation
229 this.state.activeFragmentedOperation = null;
231 this.state.lastFragment = false;
232 this.state.opcode = this.state.activeFragmentedOperation != null ? this.state.activeFragmentedOperation : 0;
233 this.state.masked = false;
234 this.expectHeader(2, this.processPacket);
238 * Reset the parser state.
243 Receiver.prototype.reset = function() {
244 if (this.dead) return;
246 activeFragmentedOperation: null,
250 fragmentedOperation: false
252 this.fragmentedBufferPool.reset(true);
253 this.unfragmentedBufferPool.reset(true);
254 this.expectOffset = 0;
255 this.expectBuffer = null;
256 this.expectHandler = null;
258 this.currentMessage = [];
262 * Unmask received data.
267 Receiver.prototype.unmask = function (mask, buf, binary) {
268 if (mask != null && buf != null) bufferUtil.unmask(buf, mask);
269 if (binary) return buf;
270 return buf != null ? buf.toString('utf8') : '';
274 * Concatenates a list of buffers.
279 Receiver.prototype.concatBuffers = function(buffers) {
281 for (var i = 0, l = buffers.length; i < l; ++i) length += buffers[i].length;
282 var mergedBuffer = new Buffer(length);
283 bufferUtil.merge(mergedBuffer, buffers);
293 Receiver.prototype.error = function (reason, protocolErrorCode) {
295 this.onerror(reason, protocolErrorCode);
303 function readUInt16BE(start) {
304 return (this[start]<<8) +
308 function readUInt32BE(start) {
309 return (this[start]<<24) +
310 (this[start+1]<<16) +
315 function fastCopy(length, srcBuffer, dstBuffer, dstOffset) {
317 default: srcBuffer.copy(dstBuffer, dstOffset, 0, length); break;
318 case 16: dstBuffer[dstOffset+15] = srcBuffer[15];
319 case 15: dstBuffer[dstOffset+14] = srcBuffer[14];
320 case 14: dstBuffer[dstOffset+13] = srcBuffer[13];
321 case 13: dstBuffer[dstOffset+12] = srcBuffer[12];
322 case 12: dstBuffer[dstOffset+11] = srcBuffer[11];
323 case 11: dstBuffer[dstOffset+10] = srcBuffer[10];
324 case 10: dstBuffer[dstOffset+9] = srcBuffer[9];
325 case 9: dstBuffer[dstOffset+8] = srcBuffer[8];
326 case 8: dstBuffer[dstOffset+7] = srcBuffer[7];
327 case 7: dstBuffer[dstOffset+6] = srcBuffer[6];
328 case 6: dstBuffer[dstOffset+5] = srcBuffer[5];
329 case 5: dstBuffer[dstOffset+4] = srcBuffer[4];
330 case 4: dstBuffer[dstOffset+3] = srcBuffer[3];
331 case 3: dstBuffer[dstOffset+2] = srcBuffer[2];
332 case 2: dstBuffer[dstOffset+1] = srcBuffer[1];
333 case 1: dstBuffer[dstOffset] = srcBuffer[0];
344 start: function(data) {
347 var firstLength = data[1] & 0x7f;
348 if (firstLength < 126) {
349 opcodes['1'].getData.call(self, firstLength);
351 else if (firstLength == 126) {
352 self.expectHeader(2, function(data) {
353 opcodes['1'].getData.call(self, readUInt16BE.call(data, 0));
356 else if (firstLength == 127) {
357 self.expectHeader(8, function(data) {
358 if (readUInt32BE.call(data, 0) != 0) {
359 self.error('packets with length spanning more than 32 bit is currently not supported', 1008);
362 opcodes['1'].getData.call(self, readUInt32BE.call(data, 4));
366 getData: function(length) {
368 if (self.state.masked) {
369 self.expectHeader(4, function(data) {
371 self.expectData(length, function(data) {
372 opcodes['1'].finish.call(self, mask, data);
377 self.expectData(length, function(data) {
378 opcodes['1'].finish.call(self, null, data);
382 finish: function(mask, data) {
383 var packet = this.unmask(mask, data, true);
384 if (packet != null) this.currentMessage.push(packet);
385 if (this.state.lastFragment) {
386 var messageBuffer = this.concatBuffers(this.currentMessage);
387 if (!Validation.isValidUTF8(messageBuffer)) {
388 this.error('invalid utf8 sequence', 1007);
391 this.ontext(messageBuffer.toString('utf8'), {masked: this.state.masked, buffer: messageBuffer});
392 this.currentMessage = [];
399 start: function(data) {
402 var firstLength = data[1] & 0x7f;
403 if (firstLength < 126) {
404 opcodes['2'].getData.call(self, firstLength);
406 else if (firstLength == 126) {
407 self.expectHeader(2, function(data) {
408 opcodes['2'].getData.call(self, readUInt16BE.call(data, 0));
411 else if (firstLength == 127) {
412 self.expectHeader(8, function(data) {
413 if (readUInt32BE.call(data, 0) != 0) {
414 self.error('packets with length spanning more than 32 bit is currently not supported', 1008);
417 opcodes['2'].getData.call(self, readUInt32BE.call(data, 4, true));
421 getData: function(length) {
423 if (self.state.masked) {
424 self.expectHeader(4, function(data) {
426 self.expectData(length, function(data) {
427 opcodes['2'].finish.call(self, mask, data);
432 self.expectData(length, function(data) {
433 opcodes['2'].finish.call(self, null, data);
437 finish: function(mask, data) {
438 var packet = this.unmask(mask, data, true);
439 if (packet != null) this.currentMessage.push(packet);
440 if (this.state.lastFragment) {
441 var messageBuffer = this.concatBuffers(this.currentMessage);
442 this.onbinary(messageBuffer, {masked: this.state.masked, buffer: messageBuffer});
443 this.currentMessage = [];
450 start: function(data) {
452 if (self.state.lastFragment == false) {
453 self.error('fragmented close is not supported', 1002);
458 var firstLength = data[1] & 0x7f;
459 if (firstLength < 126) {
460 opcodes['8'].getData.call(self, firstLength);
463 self.error('control frames cannot have more than 125 bytes of data', 1002);
466 getData: function(length) {
468 if (self.state.masked) {
469 self.expectHeader(4, function(data) {
471 self.expectData(length, function(data) {
472 opcodes['8'].finish.call(self, mask, data);
477 self.expectData(length, function(data) {
478 opcodes['8'].finish.call(self, null, data);
482 finish: function(mask, data) {
484 data = self.unmask(mask, data, true);
485 if (data && data.length == 1) {
486 self.error('close packets with data must be at least two bytes long', 1002);
489 var code = data && data.length > 1 ? readUInt16BE.call(data, 0) : 1000;
490 if (!ErrorCodes.isValidErrorCode(code)) {
491 self.error('invalid error code', 1002);
495 if (data && data.length > 2) {
496 var messageBuffer = data.slice(2);
497 if (!Validation.isValidUTF8(messageBuffer)) {
498 self.error('invalid utf8 sequence', 1007);
501 message = messageBuffer.toString('utf8');
503 this.onclose(code, message, {masked: self.state.masked});
509 start: function(data) {
511 if (self.state.lastFragment == false) {
512 self.error('fragmented ping is not supported', 1002);
517 var firstLength = data[1] & 0x7f;
518 if (firstLength < 126) {
519 opcodes['9'].getData.call(self, firstLength);
522 self.error('control frames cannot have more than 125 bytes of data', 1002);
525 getData: function(length) {
527 if (self.state.masked) {
528 self.expectHeader(4, function(data) {
530 self.expectData(length, function(data) {
531 opcodes['9'].finish.call(self, mask, data);
536 self.expectData(length, function(data) {
537 opcodes['9'].finish.call(self, null, data);
541 finish: function(mask, data) {
542 this.onping(this.unmask(mask, data, true), {masked: this.state.masked, binary: true});
548 start: function(data) {
550 if (self.state.lastFragment == false) {
551 self.error('fragmented pong is not supported', 1002);
556 var firstLength = data[1] & 0x7f;
557 if (firstLength < 126) {
558 opcodes['10'].getData.call(self, firstLength);
561 self.error('control frames cannot have more than 125 bytes of data', 1002);
564 getData: function(length) {
566 if (this.state.masked) {
567 this.expectHeader(4, function(data) {
569 self.expectData(length, function(data) {
570 opcodes['10'].finish.call(self, mask, data);
575 this.expectData(length, function(data) {
576 opcodes['10'].finish.call(self, null, data);
580 finish: function(mask, data) {
581 this.onpong(this.unmask(mask, data, true), {masked: this.state.masked, binary: true});