Skip to content

Commit

Permalink
return an error when there are insufficient bytes
Browse files Browse the repository at this point in the history
add tests for multireading and fix logic
  • Loading branch information
jacobheun committed Jul 5, 2018
1 parent e079954 commit 7d5c476
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 25 deletions.
56 changes: 39 additions & 17 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,50 @@ function isFunction (f) {
return 'function' === typeof f
}

function maxDelay(fn, delay) {
if(!delay) return fn
return function (a, cb) {
var timer = setTimeout(function () {
fn(new Error('pull-reader: read exceeded timeout'), cb)
}, delay)
fn(a, function (err, value) {
clearTimeout(timer)
cb(err, value)
})

}
function didOverread (state) {
// if data has been added to the state
// and more data than we have available is requested
// and all available data has not been read;
// an overread has occurred
if (state.total > 0 && state.wants - state.read > state.total && state.total !== state.read) return true
return false
}

function getOverreadError (state) {
return new Error('attempted to read '+state.wants+' of '+state.total+' bytes')
}

module.exports = function (timeout) {

var queue = [], read, readTimed, reading = false
var state = State(), ended, streaming, abort

function maxDelay(fn, delay) {
if(!delay) {
return function (a, cb) {
fn(a, function (err, value) {
if (err === true && didOverread(state)) {
return cb(getOverreadError(state))
}
cb(err, value)
})
}
}
return function (a, cb) {
var timer = setTimeout(function () {
fn(new Error('pull-reader: read exceeded timeout'), cb)
}, delay)
if (didOverread(state)) {
clearTimeout(timer)
return fn(getOverreadError(state), cb)
}
fn(a, function (err, value) {
clearTimeout(timer)
cb(err, value)
})
}
}

function drain () {
while (queue.length) {
if(null == queue[0].length && state.has(1)) {
Expand Down Expand Up @@ -88,6 +112,9 @@ module.exports = function (timeout) {
}

reader.read = function (len, _timeout, cb) {
if(isInteger(len)) {
state.wants += len
}
if(isFunction(_timeout))
cb = _timeout, _timeout = timeout
if(isFunction(cb)) {
Expand Down Expand Up @@ -115,8 +142,3 @@ module.exports = function (timeout) {

return reader
}





16 changes: 10 additions & 6 deletions state.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@ module.exports = function () {

return {
length: length,
total: 0,
data: this,
wants: 0,
read: 0,
add: function (data) {
if(!Buffer.isBuffer(data))
throw new Error('data must be a buffer, was: ' + JSON.stringify(data))
this.length = length = length + data.length
this.total += data.length
buffers.push(data)
return this
},
has: function (n) {
if(null == n) return length > 0
return length >= n
},
get: function (n) {
_get: function (n) {
var _length
if(n == null || n === length) {
length = 0
Expand Down Expand Up @@ -62,12 +66,12 @@ module.exports = function () {
}
else
throw new Error('could not get ' + n + ' bytes')
},
get: function(n) {
var b = this._get(n)
this.read += b.length
return b
}
}

}





34 changes: 32 additions & 2 deletions test/read.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ tape('if streaming, the stream should abort', function (t) {

tape('abort stream once in streaming mode', function (t) {

var reader = Reader(), err = new Error('intended')
var reader = Reader()

pull(Hang(), reader)

Expand Down Expand Up @@ -232,7 +232,6 @@ tape('timeout does not apply to the rest of the stream', function (t) {
pull(
reader.read(),
pull.collect(function (err, ary) {
console.log(err)
t.notOk(err)
t.equal(Buffer.concat(ary).toString(), 'hello world')
t.end()
Expand All @@ -241,6 +240,37 @@ tape('timeout does not apply to the rest of the stream', function (t) {
})


tape('overreading results in an error', function (t) {
var corruptedBytes = crypto.randomBytes(10)

pull(
pull.values([corruptedBytes]),
reader = Reader(20e3)
)

reader.read(11, function(_err) {
t.ok(_err)
t.equal(_err.message, 'attempted to read 11 of 10 bytes')
t.end()
})
})


tape('overreading with multiple reads results in an error', function (t) {
var corruptedBytes = crypto.randomBytes(10)

pull(
pull.values([corruptedBytes]),
reader = Reader()
)

reader.read(1, function(err) {
t.notOk(err)
reader.read(100, function(_err) {
t.ok(_err)
t.equal(_err.message, 'attempted to read 101 of 10 bytes')
t.end()
})
})
})

0 comments on commit 7d5c476

Please sign in to comment.