Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
fix(gateway): streaming compressed payload
Browse files Browse the repository at this point in the history
This change simplifies code responsible for streaming response
and makes the streaming actually work by telling the payload compression
stream to flush its content on every read().
(previous version was buffering entire thing in Hapi's compressor memory)

We also do content-type detection based on the beginning of the stream
by peeking at first `fileType.minimumBytes` bytes.

License: MIT
Signed-off-by: Marcin Rataj <lidel@lidel.org>
  • Loading branch information
lidel committed Apr 11, 2019
1 parent 04bbd24 commit c0838d8
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 49 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
"bl": "^3.0.0",
"boom": "^7.2.0",
"bs58": "^4.0.1",
"buffer-peek-stream": "^1.0.1",
"byteman": "^1.3.5",
"cid-tool": "~0.2.0",
"cids": "~0.5.8",
Expand Down
99 changes: 50 additions & 49 deletions src/http/gateway/resources/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
const debug = require('debug')
const log = debug('ipfs:http-gateway')
log.error = debug('ipfs:http-gateway:error')
const pull = require('pull-stream')
const pushable = require('pull-pushable')
const toStream = require('pull-stream-to-stream')

const fileType = require('file-type')
const mime = require('mime-types')
const { PassThrough } = require('readable-stream')
const Boom = require('boom')
const peek = require('buffer-peek-stream')

const { resolver } = require('ipfs-http-response')
const PathUtils = require('../utils/path')
Expand All @@ -30,6 +29,20 @@ function detectContentType (ref, chunk) {
return mime.contentType(mimeType)
}

// Enable streaming of compressed payload
// https://github.com/hapijs/hapi/issues/3599
class ResponseStream extends PassThrough {
_read (size) {
super._read(size)
if (this._compressor) {
this._compressor.flush()
}
}
setCompressor (compressor) {
this._compressor = compressor
}
}

module.exports = {
checkCID (request, h) {
if (!request.params.cid) {
Expand Down Expand Up @@ -85,58 +98,46 @@ module.exports = {
return h.redirect(PathUtils.removeTrailingSlash(ref)).permanent(true)
}

return new Promise((resolve, reject) => {
let pusher
let started = false

pull(
ipfs.catPullStream(data.cid),
pull.drain(
chunk => {
if (!started) {
started = true
pusher = pushable()
const res = h.response(toStream.source(pusher).pipe(new PassThrough()))

// Etag maps directly to an identifier for a specific version of a resource
res.header('Etag', `"${data.cid}"`)
const rawStream = ipfs.catReadableStream(data.cid)
const responseStream = new ResponseStream()

// Pass-through Content-Type sniffing over initial bytes
const contentType = await new Promise((resolve, reject) => {
try {
const peekBytes = fileType.minimumBytes
peek(rawStream, peekBytes, (err, streamHead, outputStream) => {
if (err) {
log.error(err)
return reject(err)
}
outputStream.pipe(responseStream)
resolve(detectContentType(ref, streamHead))
})
} catch (err) {
log.error(err)
reject(err)
}
})

// Set headers specific to the immutable namespace
if (ref.startsWith('/ipfs/')) {
res.header('Cache-Control', 'public, max-age=29030400, immutable')
}
const res = h.response(responseStream)

const contentType = detectContentType(ref, chunk)
// Etag maps directly to an identifier for a specific version of a resource
res.header('Etag', `"${data.cid}"`)

log('ref ', ref)
log('mime-type ', contentType)
// Set headers specific to the immutable namespace
if (ref.startsWith('/ipfs/')) {
res.header('Cache-Control', 'public, max-age=29030400, immutable')
}

if (contentType) {
log('writing content-type header')
res.header('Content-Type', contentType)
}
log('ref ', ref)
log('content-type ', contentType)

resolve(res)
}
pusher.push(chunk)
},
err => {
if (err) {
log.error(err)

// We already started flowing, abort the stream
if (started) {
return pusher.end(err)
}

return reject(err)
}
if (contentType) {
log('writing content-type header')
res.header('Content-Type', contentType)
}

pusher.end()
}
)
)
})
return res
},

afterHandler (request, h) {
Expand Down
6 changes: 6 additions & 0 deletions test/gateway/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,15 @@ describe('HTTP Gateway', function () {

expect(res.statusCode).to.equal(200)
expect(res.rawPayload).to.eql(bigFile)
expect(res.headers['x-ipfs-path']).to.equal(`/ipfs/${bigFileHash}`)
expect(res.headers['etag']).to.equal(`"${bigFileHash}"`)
expect(res.headers['cache-control']).to.equal('public, max-age=29030400, immutable')
expect(res.headers['content-type']).to.equal('application/octet-stream')
})

it('load a jpg file', async () => {
const kitty = 'QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ/cat.jpg'
const kittyDirectCid = 'Qmd286K6pohQcTKYqnS1YhWrCiS4gz7Xi34sdwMe9USZ7u'

const res = await gateway.inject({
method: 'GET',
Expand All @@ -169,6 +174,7 @@ describe('HTTP Gateway', function () {
expect(res.statusCode).to.equal(200)
expect(res.headers['content-type']).to.equal('image/jpeg')
expect(res.headers['x-ipfs-path']).to.equal('/ipfs/' + kitty)
expect(res.headers['etag']).to.equal(`"${kittyDirectCid}"`)
expect(res.headers['cache-control']).to.equal('public, max-age=29030400, immutable')
expect(res.headers.etag).to.equal('"Qmd286K6pohQcTKYqnS1YhWrCiS4gz7Xi34sdwMe9USZ7u"')
expect(res.headers.suborigin).to.equal('ipfs000bafybeidsg6t7ici2osxjkukisd5inixiunqdpq2q5jy4a2ruzdf6ewsqk4')
Expand Down

0 comments on commit c0838d8

Please sign in to comment.