diff --git a/package.json b/package.json index 242fff2c13..981a2e3db3 100644 --- a/package.json +++ b/package.json @@ -84,6 +84,7 @@ "bl": "^3.0.0", "boom": "^7.2.0", "bs58": "^4.0.1", + "buffer-peek-stream": "^1.0.1", "byteman": "^1.3.5", "cid-tool": "~0.2.0", "cids": "~0.5.8", diff --git a/src/http/gateway/resources/gateway.js b/src/http/gateway/resources/gateway.js index cd2ce1c433..6f1d324675 100644 --- a/src/http/gateway/resources/gateway.js +++ b/src/http/gateway/resources/gateway.js @@ -3,13 +3,12 @@ const debug = require('debug') const log = debug('ipfs:http-gateway') log.error = debug('ipfs:http-gateway:error') -const pull = require('pull-stream') -const pushable = require('pull-pushable') -const toStream = require('pull-stream-to-stream') + const fileType = require('file-type') const mime = require('mime-types') const { PassThrough } = require('readable-stream') const Boom = require('boom') +const peek = require('buffer-peek-stream') const { resolver } = require('ipfs-http-response') const PathUtils = require('../utils/path') @@ -30,6 +29,20 @@ function detectContentType (ref, chunk) { return mime.contentType(mimeType) } +// Enable streaming of compressed payload +// https://github.com/hapijs/hapi/issues/3599 +class ResponseStream extends PassThrough { + _read (size) { + super._read(size) + if (this._compressor) { + this._compressor.flush() + } + } + setCompressor (compressor) { + this._compressor = compressor + } +} + module.exports = { checkCID (request, h) { if (!request.params.cid) { @@ -85,58 +98,46 @@ module.exports = { return h.redirect(PathUtils.removeTrailingSlash(ref)).permanent(true) } - return new Promise((resolve, reject) => { - let pusher - let started = false - - pull( - ipfs.catPullStream(data.cid), - pull.drain( - chunk => { - if (!started) { - started = true - pusher = pushable() - const res = h.response(toStream.source(pusher).pipe(new PassThrough())) - - // Etag maps directly to an identifier for a specific version of a resource - res.header('Etag', `"${data.cid}"`) + const rawStream = ipfs.catReadableStream(data.cid) + const responseStream = new ResponseStream() + + // Pass-through Content-Type sniffing over initial bytes + const contentType = await new Promise((resolve, reject) => { + try { + const peekBytes = fileType.minimumBytes + peek(rawStream, peekBytes, (err, streamHead, outputStream) => { + if (err) { + log.error(err) + return reject(err) + } + outputStream.pipe(responseStream) + resolve(detectContentType(ref, streamHead)) + }) + } catch (err) { + log.error(err) + reject(err) + } + }) - // Set headers specific to the immutable namespace - if (ref.startsWith('/ipfs/')) { - res.header('Cache-Control', 'public, max-age=29030400, immutable') - } + const res = h.response(responseStream) - const contentType = detectContentType(ref, chunk) + // Etag maps directly to an identifier for a specific version of a resource + res.header('Etag', `"${data.cid}"`) - log('ref ', ref) - log('mime-type ', contentType) + // Set headers specific to the immutable namespace + if (ref.startsWith('/ipfs/')) { + res.header('Cache-Control', 'public, max-age=29030400, immutable') + } - if (contentType) { - log('writing content-type header') - res.header('Content-Type', contentType) - } + log('ref ', ref) + log('content-type ', contentType) - resolve(res) - } - pusher.push(chunk) - }, - err => { - if (err) { - log.error(err) - - // We already started flowing, abort the stream - if (started) { - return pusher.end(err) - } - - return reject(err) - } + if (contentType) { + log('writing content-type header') + res.header('Content-Type', contentType) + } - pusher.end() - } - ) - ) - }) + return res }, afterHandler (request, h) { diff --git a/test/gateway/index.js b/test/gateway/index.js index c5f9bd0be6..8900eaf353 100644 --- a/test/gateway/index.js +++ b/test/gateway/index.js @@ -156,10 +156,15 @@ describe('HTTP Gateway', function () { expect(res.statusCode).to.equal(200) expect(res.rawPayload).to.eql(bigFile) + expect(res.headers['x-ipfs-path']).to.equal(`/ipfs/${bigFileHash}`) + expect(res.headers['etag']).to.equal(`"${bigFileHash}"`) + expect(res.headers['cache-control']).to.equal('public, max-age=29030400, immutable') + expect(res.headers['content-type']).to.equal('application/octet-stream') }) it('load a jpg file', async () => { const kitty = 'QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ/cat.jpg' + const kittyDirectCid = 'Qmd286K6pohQcTKYqnS1YhWrCiS4gz7Xi34sdwMe9USZ7u' const res = await gateway.inject({ method: 'GET', @@ -169,6 +174,7 @@ describe('HTTP Gateway', function () { expect(res.statusCode).to.equal(200) expect(res.headers['content-type']).to.equal('image/jpeg') expect(res.headers['x-ipfs-path']).to.equal('/ipfs/' + kitty) + expect(res.headers['etag']).to.equal(`"${kittyDirectCid}"`) expect(res.headers['cache-control']).to.equal('public, max-age=29030400, immutable') expect(res.headers.etag).to.equal('"Qmd286K6pohQcTKYqnS1YhWrCiS4gz7Xi34sdwMe9USZ7u"') expect(res.headers.suborigin).to.equal('ipfs000bafybeidsg6t7ici2osxjkukisd5inixiunqdpq2q5jy4a2ruzdf6ewsqk4')