From 7d5c4767cc9c938d261a23e1b08e041a7bd9e507 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Fri, 22 Jun 2018 17:44:46 +0200 Subject: [PATCH] return an error when there are insufficient bytes add tests for multireading and fix logic --- index.js | 56 ++++++++++++++++++++++++++++++++++++---------------- state.js | 16 +++++++++------ test/read.js | 34 +++++++++++++++++++++++++++++-- 3 files changed, 81 insertions(+), 25 deletions(-) diff --git a/index.js b/index.js index ece86c2..39ea4fa 100644 --- a/index.js +++ b/index.js @@ -9,19 +9,17 @@ function isFunction (f) { return 'function' === typeof f } -function maxDelay(fn, delay) { - if(!delay) return fn - return function (a, cb) { - var timer = setTimeout(function () { - fn(new Error('pull-reader: read exceeded timeout'), cb) - }, delay) - fn(a, function (err, value) { - clearTimeout(timer) - cb(err, value) - }) - - } +function didOverread (state) { + // if data has been added to the state + // and more data than we have available is requested + // and all available data has not been read; + // an overread has occurred + if (state.total > 0 && state.wants - state.read > state.total && state.total !== state.read) return true + return false +} +function getOverreadError (state) { + return new Error('attempted to read '+state.wants+' of '+state.total+' bytes') } module.exports = function (timeout) { @@ -29,6 +27,32 @@ module.exports = function (timeout) { var queue = [], read, readTimed, reading = false var state = State(), ended, streaming, abort + function maxDelay(fn, delay) { + if(!delay) { + return function (a, cb) { + fn(a, function (err, value) { + if (err === true && didOverread(state)) { + return cb(getOverreadError(state)) + } + cb(err, value) + }) + } + } + return function (a, cb) { + var timer = setTimeout(function () { + fn(new Error('pull-reader: read exceeded timeout'), cb) + }, delay) + if (didOverread(state)) { + clearTimeout(timer) + return fn(getOverreadError(state), cb) + } + fn(a, function (err, value) { + clearTimeout(timer) + cb(err, value) + }) + } + } + function drain () { while (queue.length) { if(null == queue[0].length && state.has(1)) { @@ -88,6 +112,9 @@ module.exports = function (timeout) { } reader.read = function (len, _timeout, cb) { + if(isInteger(len)) { + state.wants += len + } if(isFunction(_timeout)) cb = _timeout, _timeout = timeout if(isFunction(cb)) { @@ -115,8 +142,3 @@ module.exports = function (timeout) { return reader } - - - - - diff --git a/state.js b/state.js index 4c2f700..1fa5d8f 100644 --- a/state.js +++ b/state.js @@ -12,11 +12,15 @@ module.exports = function () { return { length: length, + total: 0, data: this, + wants: 0, + read: 0, add: function (data) { if(!Buffer.isBuffer(data)) throw new Error('data must be a buffer, was: ' + JSON.stringify(data)) this.length = length = length + data.length + this.total += data.length buffers.push(data) return this }, @@ -24,7 +28,7 @@ module.exports = function () { if(null == n) return length > 0 return length >= n }, - get: function (n) { + _get: function (n) { var _length if(n == null || n === length) { length = 0 @@ -62,12 +66,12 @@ module.exports = function () { } else throw new Error('could not get ' + n + ' bytes') + }, + get: function(n) { + var b = this._get(n) + this.read += b.length + return b } } } - - - - - diff --git a/test/read.js b/test/read.js index 4a2b69f..da1fdae 100644 --- a/test/read.js +++ b/test/read.js @@ -181,7 +181,7 @@ tape('if streaming, the stream should abort', function (t) { tape('abort stream once in streaming mode', function (t) { - var reader = Reader(), err = new Error('intended') + var reader = Reader() pull(Hang(), reader) @@ -232,7 +232,6 @@ tape('timeout does not apply to the rest of the stream', function (t) { pull( reader.read(), pull.collect(function (err, ary) { - console.log(err) t.notOk(err) t.equal(Buffer.concat(ary).toString(), 'hello world') t.end() @@ -241,6 +240,37 @@ tape('timeout does not apply to the rest of the stream', function (t) { }) +tape('overreading results in an error', function (t) { + var corruptedBytes = crypto.randomBytes(10) + + pull( + pull.values([corruptedBytes]), + reader = Reader(20e3) + ) + + reader.read(11, function(_err) { + t.ok(_err) + t.equal(_err.message, 'attempted to read 11 of 10 bytes') + t.end() + }) +}) + + +tape('overreading with multiple reads results in an error', function (t) { + var corruptedBytes = crypto.randomBytes(10) + pull( + pull.values([corruptedBytes]), + reader = Reader() + ) + reader.read(1, function(err) { + t.notOk(err) + reader.read(100, function(_err) { + t.ok(_err) + t.equal(_err.message, 'attempted to read 101 of 10 bytes') + t.end() + }) + }) +})