1 var core = require('mongodb-core');
2 var crypto = require('crypto');
3 var shallowClone = require('../utils').shallowClone;
4 var stream = require('stream');
5 var util = require('util');
7 var ERROR_NAMESPACE_NOT_FOUND = 26;
9 module.exports = GridFSBucketWriteStream;
12 * A writable stream that enables you to write buffers to GridFS.
14 * Do not instantiate this class directly. Use `openUploadStream()` instead.
17 * @param {GridFSBucket} bucket Handle for this stream's corresponding bucket
18 * @param {string} filename The value of the 'filename' key in the files doc
19 * @param {object} [options=null] Optional settings.
20 * @param {string|number|object} [options.id=null] Custom file id for the GridFS file.
21 * @param {number} [options.chunkSizeBytes=null] The chunk size to use, in bytes
22 * @param {number} [options.w=null] The write concern
23 * @param {number} [options.wtimeout=null] The write concern timeout
24 * @param {number} [options.j=null] The journal write concern
25 * @fires GridFSBucketWriteStream#error
26 * @fires GridFSBucketWriteStream#finish
27 * @return {GridFSBucketWriteStream} a GridFSBucketWriteStream instance.
30 function GridFSBucketWriteStream(bucket, filename, options) {
31 options = options || {};
33 this.chunks = bucket.s._chunksCollection;
34 this.filename = filename;
35 this.files = bucket.s._filesCollection;
36 this.options = options;
38 this.id = options.id ? options.id : core.BSON.ObjectId();
39 this.chunkSizeBytes = this.options.chunkSizeBytes;
40 this.bufToStore = new Buffer(this.chunkSizeBytes);
42 this.md5 = crypto.createHash('md5');
47 outstandingRequests: 0,
50 promiseLibrary: this.bucket.s.promiseLibrary
53 if (!this.bucket.s.calledOpenUploadStream) {
54 this.bucket.s.calledOpenUploadStream = true;
57 checkIndexes(this, function() {
58 _this.bucket.s.checkedIndexes = true;
59 _this.bucket.emit('index');
64 util.inherits(GridFSBucketWriteStream, stream.Writable);
69 * @event GridFSBucketWriteStream#error
74 * `end()` was called and the write stream successfully wrote the file
75 * metadata and all the chunks to MongoDB.
77 * @event GridFSBucketWriteStream#finish
82 * Write a buffer to the stream.
85 * @param {Buffer} chunk Buffer to write
86 * @param {String} encoding Optional encoding for the buffer
87 * @param {Function} callback Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush.
88 * @return {Boolean} False if this write required flushing a chunk to MongoDB. True otherwise.
91 GridFSBucketWriteStream.prototype.write = function(chunk, encoding, callback) {
93 return waitForIndexes(this, function() {
94 return doWrite(_this, chunk, encoding, callback);
99 * Places this write stream into an aborted state (all future writes fail)
100 * and deletes all chunks that have already been written.
103 * @param {GridFSBucket~errorCallback} callback called when chunks are successfully removed or error occurred
104 * @return {Promise} if no callback specified
107 GridFSBucketWriteStream.prototype.abort = function(callback) {
108 if (this.state.streamEnd) {
109 var error = new Error('Cannot abort a stream that has already completed');
110 if (typeof callback == 'function') {
111 return callback(error);
113 return this.state.promiseLibrary.reject(error);
115 if (this.state.aborted) {
116 var error = new Error('Cannot call abort() on a stream twice');
117 if (typeof callback == 'function') {
118 return callback(error);
120 return this.state.promiseLibrary.reject(error);
122 this.state.aborted = true;
123 this.chunks.deleteMany({ files_id: this.id }, function(error) {
124 if(typeof callback == 'function') callback(error);
129 * Tells the stream that no more data will be coming in. The stream will
130 * persist the remaining data to MongoDB, write the files document, and
131 * then emit a 'finish' event.
134 * @param {Buffer} chunk Buffer to write
135 * @param {String} encoding Optional encoding for the buffer
136 * @param {Function} callback Function to call when all files and chunks have been persisted to MongoDB
139 GridFSBucketWriteStream.prototype.end = function(chunk, encoding, callback) {
140 if(typeof chunk == 'function') {
141 callback = chunk, chunk = null, encoding = null;
142 } else if(typeof encoding == 'function') {
143 callback = encoding, encoding = null;
146 if (checkAborted(this, callback)) {
150 this.state.streamEnd = true;
153 this.once('finish', function(result) {
154 callback(null, result);
159 waitForIndexes(this, function() {
166 var inputBuf = (Buffer.isBuffer(chunk)) ?
167 chunk : new Buffer(chunk, encoding);
169 this.write(chunk, encoding, function() {
178 function __handleError(_this, error, callback) {
179 if (_this.state.errored) {
182 _this.state.errored = true;
184 return callback(error);
186 _this.emit('error', error);
193 function createChunkDoc(filesId, n, data) {
195 _id: core.BSON.ObjectId(),
206 function checkChunksIndex(_this, callback) {
207 _this.chunks.listIndexes().toArray(function(error, indexes) {
209 // Collection doesn't exist so create index
210 if (error.code === ERROR_NAMESPACE_NOT_FOUND) {
211 var index = { files_id: 1, n: 1 };
212 _this.chunks.createIndex(index, { background: false, unique: true }, function(error) {
214 return callback(error);
221 return callback(error);
224 var hasChunksIndex = false;
225 indexes.forEach(function(index) {
227 var keys = Object.keys(index.key);
228 if (keys.length === 2 && index.key.files_id === 1 &&
230 hasChunksIndex = true;
235 if (hasChunksIndex) {
238 var index = { files_id: 1, n: 1 };
239 var indexOptions = getWriteOptions(_this);
241 indexOptions.background = false;
242 indexOptions.unique = true;
244 _this.chunks.createIndex(index, indexOptions, function(error) {
246 return callback(error);
259 function checkDone(_this, callback) {
260 if (_this.state.streamEnd &&
261 _this.state.outstandingRequests === 0 &&
262 !_this.state.errored) {
263 var filesDoc = createFilesDoc(_this.id, _this.length, _this.chunkSizeBytes,
264 _this.md5.digest('hex'), _this.filename, _this.options.contentType,
265 _this.options.aliases, _this.options.metadata);
267 if (checkAborted(_this, callback)) {
271 _this.files.insert(filesDoc, getWriteOptions(_this), function(error) {
273 return __handleError(_this, error, callback);
275 _this.emit('finish', filesDoc);
288 function checkIndexes(_this, callback) {
289 _this.files.findOne({}, { _id: 1 }, function(error, doc) {
291 return callback(error);
297 _this.files.listIndexes().toArray(function(error, indexes) {
299 // Collection doesn't exist so create index
300 if (error.code === ERROR_NAMESPACE_NOT_FOUND) {
301 var index = { filename: 1, uploadDate: 1 };
302 _this.files.createIndex(index, { background: false }, function(error) {
304 return callback(error);
307 checkChunksIndex(_this, callback);
311 return callback(error);
314 var hasFileIndex = false;
315 indexes.forEach(function(index) {
316 var keys = Object.keys(index.key);
317 if (keys.length === 2 && index.key.filename === 1 &&
318 index.key.uploadDate === 1) {
324 checkChunksIndex(_this, callback);
326 var index = { filename: 1, uploadDate: 1 };
328 var indexOptions = getWriteOptions(_this);
330 indexOptions.background = false;
332 _this.files.createIndex(index, indexOptions, function(error) {
334 return callback(error);
337 checkChunksIndex(_this, callback);
348 function createFilesDoc(_id, length, chunkSize, md5, filename, contentType,
353 chunkSize: chunkSize,
354 uploadDate: new Date(),
360 ret.contentType = contentType;
364 ret.aliases = aliases;
368 ret.metadata = metadata;
378 function doWrite(_this, chunk, encoding, callback) {
379 if (checkAborted(_this, callback)) {
383 var inputBuf = (Buffer.isBuffer(chunk)) ?
384 chunk : new Buffer(chunk, encoding);
386 _this.length += inputBuf.length;
388 // Input is small enough to fit in our buffer
389 if (_this.pos + inputBuf.length < _this.chunkSizeBytes) {
390 inputBuf.copy(_this.bufToStore, _this.pos);
391 _this.pos += inputBuf.length;
393 callback && callback();
395 // Note that we reverse the typical semantics of write's return value
396 // to be compatible with node's `.pipe()` function.
397 // True means client can keep writing.
401 // Otherwise, buffer is too big for current chunk, so we need to flush
403 var inputBufRemaining = inputBuf.length;
404 var spaceRemaining = _this.chunkSizeBytes - _this.pos;
405 var numToCopy = Math.min(spaceRemaining, inputBuf.length);
406 var outstandingRequests = 0;
407 while (inputBufRemaining > 0) {
408 var inputBufPos = inputBuf.length - inputBufRemaining;
409 inputBuf.copy(_this.bufToStore, _this.pos,
410 inputBufPos, inputBufPos + numToCopy);
411 _this.pos += numToCopy;
412 spaceRemaining -= numToCopy;
413 if (spaceRemaining === 0) {
414 _this.md5.update(_this.bufToStore);
415 var doc = createChunkDoc(_this.id, _this.n, _this.bufToStore);
416 ++_this.state.outstandingRequests;
417 ++outstandingRequests;
419 if (checkAborted(_this, callback)) {
423 _this.chunks.insert(doc, getWriteOptions(_this), function(error) {
425 return __handleError(_this, error);
427 --_this.state.outstandingRequests;
428 --outstandingRequests;
429 if (!outstandingRequests) {
430 _this.emit('drain', doc);
431 callback && callback();
436 spaceRemaining = _this.chunkSizeBytes;
440 inputBufRemaining -= numToCopy;
441 numToCopy = Math.min(spaceRemaining, inputBufRemaining);
444 // Note that we reverse the typical semantics of write's return value
445 // to be compatible with node's `.pipe()` function.
446 // False means the client should wait for the 'drain' event.
454 function getWriteOptions(_this) {
456 if (_this.options.writeConcern) {
458 obj.wtimeout = concern.wtimeout;
468 function waitForIndexes(_this, callback) {
469 if (_this.bucket.s.checkedIndexes) {
470 return callback(false);
473 _this.bucket.once('index', function() {
484 function writeRemnant(_this, callback) {
485 // Buffer is empty, so don't bother to insert
486 if (_this.pos === 0) {
487 return checkDone(_this, callback);
490 ++_this.state.outstandingRequests;
492 // Create a new buffer to make sure the buffer isn't bigger than it needs
494 var remnant = new Buffer(_this.pos);
495 _this.bufToStore.copy(remnant, 0, 0, _this.pos);
496 _this.md5.update(remnant);
497 var doc = createChunkDoc(_this.id, _this.n, remnant);
499 // If the stream was aborted, do not write remnant
500 if (checkAborted(_this, callback)) {
504 _this.chunks.insert(doc, getWriteOptions(_this), function(error) {
506 return __handleError(_this, error);
508 --_this.state.outstandingRequests;
517 function checkAborted(_this, callback) {
518 if (_this.state.aborted) {
519 if(typeof callback == 'function') {
520 callback(new Error('this stream has been aborted'));