From 06a9e270f811de87e0a10f09276b77de08aabc26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Uhl=C3=AD=C5=99?= Date: Thu, 25 Jun 2020 15:15:42 +0200 Subject: [PATCH] feat: store blocks under multihash key (#211) This is related to https://github.com/ipfs/js-ipfs/issues/2415 Breaking changes: - Repo version incremented to `8`, requires a migration - Blocks are now stored using the multihash, not the full CID - `repo.blocks.query({})` now returns an async iterator that yields blocks - `repo.blocks.query({ keysOnly: true })` now returns an async iterator that yields CIDs - Those CIDs are v1 with the raw codec Co-authored-by: achingbrain --- README.md | 57 ++++++++++---- package.json | 7 +- src/blockstore-utils.js | 7 +- src/blockstore.js | 142 +++++++++++++++++----------------- src/constants.js | 2 +- src/index.js | 12 +-- src/lock-memory.js | 4 +- src/lock.js | 2 +- test/blockstore-test.js | 60 +++++++++++++- test/blockstore-utils-test.js | 17 +++- test/interop-test.js | 2 +- test/repo-test.js | 4 +- 12 files changed, 208 insertions(+), 108 deletions(-) diff --git a/README.md b/README.md index 6bb88de0..d44bdd3d 100644 --- a/README.md +++ b/README.md @@ -40,17 +40,20 @@ This is the implementation of the [IPFS repo spec](https://github.com/ipfs/specs - [`Promise repo.get(key)`](#promisebuffer-repogetkey) - [Blocks](#blocks) - [`Promise repo.blocks.put(block:Block)`](#promiseblock-repoblocksputblockblock) - - [`AsyncIterator repo.blocks.putMany(source)`](#asynciteratorblock-repoblocksputmanysource) - - [`Promise repo.blocks.get(cid)`](#promisebuffer-repoblocksgetcid) - - [`AsyncIterable repo.blocks.getMany(source)`](#asynciterablebuffer-repoblocksgetmanysource) + - [`AsyncIterator repo.blocks.putMany(source:AsyncIterable)`](#asynciteratorblock-repoblocksputmanysourceasynciterableblock) + - [`Promise repo.blocks.get(cid:CID)`](#promiseblock-repoblocksgetcidcid) + - [`AsyncIterable repo.blocks.getMany(source:AsyncIterable)`](#asynciterableblock-repoblocksgetmanysourceasynciterablecid) + - [`Promise repo.blocks.has (cid:CID)`](#promiseboolean-repoblockshas-cidcid) + - [`Promise repo.blocks.delete (cid:CID)`](#promiseboolean-repoblocksdelete-cidcid) + - [`AsyncIterator repo.blocks.query (query)`](#asynciteratorblockcid-repoblocksquery-query) - [`Promise repo.blocks.delete(cid:CID)`](#promisecid-repoblocksdeletecidcid) - - [`AsyncIterator repo.blocks.deleteMany(source)`](#asynciteratorcid-repoblocksdeletemanysource) + - [`AsyncIterator repo.blocks.deleteMany(source:AsyncIterable)`](#asynciteratorcid-repoblocksdeletemanysourceasynciterablecid) - [Datastore](#datastore) - [`repo.datastore`](#repodatastore) - [Config](#config) - - [`Promise repo.config.set(key:string, value)`](#promise-repoconfigsetkeystring-value) - - [`Promise repo.config.replace(value)`](#promise-repoconfigreplacevalue) - - [`Promise repo.config.get(key:string)`](#promise-repoconfiggetkeystring) + - [`Promise repo.config.set(key:String, value:Object)`](#promise-repoconfigsetkeystring-valueobject) + - [`Promise repo.config.replace(value:Object)`](#promise-repoconfigreplacevalueobject) + - [`Promise repo.config.get(key:String)`](#promise-repoconfiggetkeystring) - [`Promise repo.config.getAll()`](#promiseobject-repoconfiggetall) - [`Promise repo.config.exists()`](#promiseboolean-repoconfigexists) - [Version](#version) @@ -229,31 +232,53 @@ Get a value at the root of the repo * `block` should be of type [Block][] -#### `AsyncIterator repo.blocks.putMany(source)` +#### `AsyncIterator repo.blocks.putMany(source:AsyncIterable)` Put many blocks. * `source` should be an AsyncIterable that yields entries of type [Block][] -#### `Promise repo.blocks.get(cid)` +#### `Promise repo.blocks.get(cid:CID)` Get block. * `cid` is the content id of type [CID][] -#### `AsyncIterable repo.blocks.getMany(source)` +#### `AsyncIterable repo.blocks.getMany(source:AsyncIterable)` -Get block. +Get many blocks * `source` should be an AsyncIterable that yields entries of type [CID][] +#### `Promise repo.blocks.has (cid:CID)` + +Indicate if a block is present for the passed CID + +* `cid` should be of the type [CID][] + +#### `Promise repo.blocks.delete (cid:CID)` + +Deletes a block + +* `cid` should be of the type [CID][] + +#### `AsyncIterator repo.blocks.query (query)` + +Query what blocks are available in blockstore. + +If `query.keysOnly` is true, the returned iterator will yield [CID][]s, otherwise it will yield [Block][]s + +* `query` is a object as specified in [interface-datastore](https://github.com/ipfs/interface-datastore#query). + +Datastore: + #### `Promise repo.blocks.delete(cid:CID)` * `cid` should be of the type [CID][] Delete a block -#### `AsyncIterator repo.blocks.deleteMany(source)` +#### `AsyncIterator repo.blocks.deleteMany(source:AsyncIterable)` * `source` should be an Iterable or AsyncIterable that yields entries of the type [CID][] @@ -269,7 +294,7 @@ This contains a full implementation of [the `interface-datastore` API](https://g Instead of using `repo.set('config')` this exposes an API that allows you to set and get a decoded config object, as well as, in a safe manner, change any of the config values individually. -#### `Promise repo.config.set(key:string, value)` +#### `Promise repo.config.set(key:String, value:Object)` Set a config value. `value` can be any object that is serializable to JSON. @@ -281,11 +306,11 @@ const config = await repo.config.get() assert.equal(config.a.b.c, 'c value') ``` -#### `Promise repo.config.replace(value)` +#### `Promise repo.config.replace(value:Object)` Set the whole config value. `value` can be any object that is serializable to JSON. -#### `Promise repo.config.get(key:string)` +#### `Promise repo.config.get(key:String)` Get a config value. Returned promise resolves to the same type that was set before. @@ -379,7 +404,7 @@ Returned promise resolves to a `boolean` indicating the existence of the lock. ### Migrations -When there is a new repo migration and the version of repo is increased, don't +When there is a new repo migration and the version of the repo is increased, don't forget to propagate the changes into the test repo (`test/test-repo`). **For tools that run mainly in the browser environment, be aware that disabling automatic diff --git a/package.json b/package.json index 734f7ed1..586bc28e 100644 --- a/package.json +++ b/package.json @@ -52,8 +52,7 @@ "it-first": "^1.0.2", "just-range": "^2.1.0", "memdown": "^5.1.0", - "multihashes": "^1.0.1", - "multihashing-async": "^0.8.0", + "multihashing-async": "^1.0.0", "ncp": "^2.0.0", "rimraf": "^3.0.0", "sinon": "^9.0.2" @@ -69,11 +68,11 @@ "debug": "^4.1.0", "err-code": "^2.0.0", "interface-datastore": "^1.0.2", - "ipfs-repo-migrations": "^0.2.1", + "ipfs-repo-migrations": "^1.0.0", "ipfs-utils": "^2.2.0", "ipld-block": "^0.9.1", "it-map": "^1.0.2", - "it-pipe": "^1.1.0", + "it-pushable": "^1.4.0", "just-safe-get": "^2.0.0", "just-safe-set": "^2.1.0", "multibase": "^1.0.1", diff --git a/src/blockstore-utils.js b/src/blockstore-utils.js index eda398c2..fece2f98 100644 --- a/src/blockstore-utils.js +++ b/src/blockstore-utils.js @@ -16,15 +16,18 @@ exports.cidToKey = cid => { throw errcode(new Error('Not a valid cid'), 'ERR_INVALID_CID') } - return new Key('/' + multibase.encode('base32', cid.buffer).toString().slice(1).toUpperCase(), false) + return new Key('/' + multibase.encode('base32', cid.multihash).toString().slice(1).toUpperCase(), false) } /** * Transform a datastore Key instance to a CID + * As Key is a multihash of the CID, it is reconstructed using IPLD's RAW codec. + * Hence it is highly probable that stored CID will differ from a CID retrieved from blockstore. * * @param {Key} key * @returns {CID} */ exports.keyToCid = key => { - return new CID(multibase.decode('b' + key.toString().slice(1).toLowerCase())) + // Block key is of the form / + return new CID(1, 'raw', multibase.decode('b' + key.toString().slice(1).toLowerCase())) } diff --git a/src/blockstore.js b/src/blockstore.js index 01632318..2d347cf0 100644 --- a/src/blockstore.js +++ b/src/blockstore.js @@ -5,7 +5,8 @@ const ShardingStore = core.ShardingDatastore const Block = require('ipld-block') const { cidToKey, keyToCid } = require('./blockstore-utils') const map = require('it-map') -const pipe = require('it-pipe') +const drain = require('it-drain') +const pushable = require('it-pushable') module.exports = async (filestore, options) => { const store = await maybeWithSharding(filestore, options) @@ -23,17 +24,25 @@ function maybeWithSharding (filestore, options) { function createBaseStore (store) { return { /** - * Query the store. + * Query the store * * @param {Object} query * @param {Object} options - * @returns {AsyncIterator} + * @returns {AsyncIterator} */ - async * query (query, options) { // eslint-disable-line require-await - yield * store.query(query, options) + async * query (query, options) { + for await (const { key, value } of store.query(query, options)) { + if (query.keysOnly) { + yield keyToCid(key) + continue + } + + yield new Block(value, keyToCid(key)) + } }, + /** - * Get a single block by CID. + * Get a single block by CID * * @param {CID} cid * @param {Object} options @@ -41,29 +50,13 @@ function createBaseStore (store) { */ async get (cid, options) { const key = cidToKey(cid) - let blockData - try { - blockData = await store.get(key, options) - return new Block(blockData, cid) - } catch (err) { - if (err.code === 'ERR_NOT_FOUND') { - const otherCid = cidToOtherVersion(cid) - - if (!otherCid) { - throw err - } - - const otherKey = cidToKey(otherCid) - const blockData = await store.get(otherKey, options) - await store.put(key, blockData) - return new Block(blockData, cid) - } + const blockData = await store.get(key, options) - throw err - } + return new Block(blockData, cid) }, + /** - * Like get, but for more. + * Like get, but for more * * @param {AsyncIterator} cids * @param {Object} options @@ -74,8 +67,9 @@ function createBaseStore (store) { yield this.get(cid, options) } }, + /** - * Write a single block to the store. + * Write a single block to the store * * @param {Block} block * @param {Object} options @@ -86,59 +80,75 @@ function createBaseStore (store) { throw new Error('invalid block') } - const exists = await this.has(block.cid) + const key = cidToKey(block.cid) + const exists = await store.has(key, options) - if (exists) { - return this.get(block.cid, options) + if (!exists) { + await store.put(key, block.data, options) } - await store.put(cidToKey(block.cid), block.data, options) - return block }, /** - * Like put, but for more. + * Like put, but for more * * @param {AsyncIterable|Iterable} blocks * @param {Object} options * @returns {AsyncIterable} */ async * putMany (blocks, options) { // eslint-disable-line require-await - yield * pipe( - blocks, - (source) => { - // turn them into a key/value pair - return map(source, (block) => { - return { key: cidToKey(block.cid), value: block.data } - }) - }, - (source) => { - // put them into the datastore - return store.putMany(source, options) - }, - (source) => { - // map the returned key/value back into a block - return map(source, ({ key, value }) => { - return new Block(value, keyToCid(key)) - }) + // we cannot simply chain to `store.putMany` because we convert a CID into + // a key based on the multihash only, so we lose the version & codec and + // cannot give the user back the CID they used to create the block, so yield + // to `store.putMany` but return the actual block the user passed in. + // + // nb. we want to use `store.putMany` here so bitswap can control batching + // up block HAVEs to send to the network - if we use multiple `store.put`s + // it will not be able to guess we are about to `store.put` more blocks + const output = pushable() + + // process.nextTick runs on the microtask queue, setImmediate runs on the next + // event loop iteration so is slower. Use process.nextTick if it is available. + const runner = process && process.nextTick ? process.nextTick : setImmediate + + runner(async () => { + try { + await drain(store.putMany(async function * () { + for await (const block of blocks) { + const key = cidToKey(block.cid) + const exists = await store.has(key, options) + + if (!exists) { + yield { key, value: block.data } + } + + // there is an assumption here that after the yield has completed + // the underlying datastore has finished writing the block + output.push(block) + } + }())) + + output.end() + } catch (err) { + output.end(err) } - ) + }) + + yield * output }, + /** - * Does the store contain block with this cid? + * Does the store contain block with this CID? * * @param {CID} cid * @param {Object} options * @returns {Promise} */ - async has (cid, options) { - const exists = await store.has(cidToKey(cid), options) - if (exists) return exists - const otherCid = cidToOtherVersion(cid) - if (!otherCid) return false - return store.has(cidToKey(otherCid), options) + async has (cid, options) { // eslint-disable-line require-await + return store.has(cidToKey(cid), options) }, + /** * Delete a block from the store * @@ -149,6 +159,7 @@ function createBaseStore (store) { async delete (cid, options) { // eslint-disable-line require-await return store.delete(cidToKey(cid), options) }, + /** * Delete a block from the store * @@ -157,12 +168,9 @@ function createBaseStore (store) { * @returns {Promise} */ async * deleteMany (cids, options) { // eslint-disable-line require-await - yield * store.deleteMany((async function * () { - for await (const cid of cids) { - yield cidToKey(cid) - } - }()), options) + yield * store.deleteMany(map(cids, cid => cidToKey(cid)), options) }, + /** * Close the store * @@ -173,11 +181,3 @@ function createBaseStore (store) { } } } - -function cidToOtherVersion (cid) { - try { - return cid.version === 0 ? cid.toV1() : cid.toV0() - } catch (err) { - return null - } -} diff --git a/src/constants.js b/src/constants.js index d680f22a..67c1643d 100644 --- a/src/constants.js +++ b/src/constants.js @@ -1,5 +1,5 @@ 'use strict' module.exports = { - repoVersion: 7 + repoVersion: 8 } diff --git a/src/index.js b/src/index.js index fcafa703..6b818d6b 100644 --- a/src/index.js +++ b/src/index.js @@ -35,8 +35,8 @@ const lockers = { */ class IpfsRepo { /** - * @param {string} repoPath - path where the repo is stored - * @param {object} options - Configuration + * @param {String} repoPath - path where the repo is stored + * @param {Object} options - Configuration */ constructor (repoPath, options) { if (typeof repoPath !== 'string') { @@ -185,7 +185,7 @@ class IpfsRepo { * Creates a lock on the repo if a locker is specified. The lockfile object will * be returned in the callback if one has been created. * - * @param {string} path + * @param {String} path * @returns {Promise} */ async _openLock (path) { @@ -353,8 +353,8 @@ class IpfsRepo { for await (const block of this.blocks.query({})) { count = count.plus(1) size = size - .plus(block.value.byteLength) - .plus(block.key._buf.byteLength) + .plus(block.data.byteLength) + .plus(block.cid.buffer.byteLength) } return { count, size } @@ -365,7 +365,7 @@ async function getSize (queryFn) { const sum = new Big(0) for await (const block of queryFn.query({})) { sum.plus(block.value.byteLength) - .plus(block.key._buf.byteLength) + .plus(block.key.toBuffer().byteLength) } return sum } diff --git a/src/lock-memory.js b/src/lock-memory.js index d01ccced..624d23ff 100644 --- a/src/lock-memory.js +++ b/src/lock-memory.js @@ -12,7 +12,7 @@ const LOCKS = {} /** * Lock the repo in the given dir. * - * @param {string} dir + * @param {String} dir * @returns {Promise} */ exports.lock = async (dir) => { // eslint-disable-line require-await @@ -37,7 +37,7 @@ exports.lock = async (dir) => { // eslint-disable-line require-await /** * Check if the repo in the given directory is locked. * - * @param {string} dir + * @param {String} dir * @returns {bool} */ exports.locked = async (dir) => { // eslint-disable-line require-await diff --git a/src/lock.js b/src/lock.js index dcc49eed..63684836 100644 --- a/src/lock.js +++ b/src/lock.js @@ -22,7 +22,7 @@ const STALE_TIME = 20000 /** * Lock the repo in the given dir. * - * @param {string} dir + * @param {String} dir * @returns {Promise} */ exports.lock = async (dir) => { diff --git a/test/blockstore-test.js b/test/blockstore-test.js index fa419236..a899951c 100644 --- a/test/blockstore-test.js +++ b/test/blockstore-test.js @@ -15,6 +15,13 @@ const drain = require('it-drain') const all = require('it-all') const first = require('it-first') +async function makeBlock () { + const bData = Buffer.from(`hello-${Math.random()}`) + + const hash = await multihashing(bData, 'sha2-256') + return new Block(bData, new CID(hash)) +} + module.exports = (repo) => { describe('blockstore', () => { const blockData = range(100).map((i) => Buffer.from(`hello-${i}-${Math.random()}`)) @@ -66,7 +73,10 @@ module.exports = (repo) => { const hash = await multihashing(d, 'sha2-256') return new Block(d, new CID(hash)) })) - await drain(repo.blocks.putMany(blocks)) + + const put = await all(repo.blocks.putMany(blocks)) + expect(put).to.deep.equal(blocks) + for (const block of blocks) { const block1 = await repo.blocks.get(block.cid) expect(block1).to.be.eql(block) @@ -362,5 +372,53 @@ module.exports = (repo) => { return expect(drain(repo.blocks.deleteMany(['foo']))).to.eventually.be.rejected().with.property('code', 'ERR_INVALID_CID') }) }) + + describe('.query', () => { + let block1 + let block2 + + before(async () => { + block1 = await makeBlock() + block2 = await makeBlock() + + await repo.blocks.put(block1) + await repo.blocks.put(block2) + }) + + it('returns key/values for block data', async () => { + const blocks = await all(repo.blocks.query({})) + const block = blocks.find(block => block.data.toString('base64') === block1.data.toString('base64')) + + expect(block).to.be.ok() + expect(block.cid.multihash).to.deep.equal(block1.cid.multihash) + expect(block.data).to.deep.equal(block1.data) + }) + + it('returns some of the blocks', async () => { + const blocksWithPrefix = await all(repo.blocks.query({ + prefix: cidToKey(block1.cid).toString().substring(0, 10) + })) + const block = blocksWithPrefix.find(block => block.data.toString('base64') === block1.data.toString('base64')) + + expect(block).to.be.ok() + expect(block.cid.multihash).to.deep.equal(block1.cid.multihash) + expect(block.data).to.deep.equal(block1.data) + + const allBlocks = await all(repo.blocks.query({})) + expect(blocksWithPrefix.length).to.be.lessThan(allBlocks.length) + }) + + it('returns only keys', async () => { + const cids = await all(repo.blocks.query({ + keysOnly: true + })) + + expect(cids.length).to.be.greaterThan(0) + + for (const cid of cids) { + expect(CID.isCID(cid)).to.be.true() + } + }) + }) }) } diff --git a/test/blockstore-utils-test.js b/test/blockstore-utils-test.js index 630069e9..d31621c3 100644 --- a/test/blockstore-utils-test.js +++ b/test/blockstore-utils-test.js @@ -9,12 +9,27 @@ const Repo = require('../src') module.exports = () => { describe('blockstore utils', () => { it('converts a CID to a datastore Key and back', () => { - const originalCid = new CID('Qme6KJdKcp85TYbLxuLV7oQzMiLremD7HMoXLZEmgo6Rnh') + // CIDv1 in base32 with IPLD raw codec + const originalCid = new CID('bafkreihkb3vrxxex5zvzkr3s3a6noe223r7jka4ofjy2nkzu27kueg76ii') const key = Repo.utils.blockstore.cidToKey(originalCid) expect(key instanceof Key).to.be.true() const cid = Repo.utils.blockstore.keyToCid(key) expect(cid instanceof CID).to.be.true() expect(originalCid.toString()).to.equal(cid.toString()) }) + + it('converts a CID to base32 encoded key', () => { + // CIDv0 in base58btc with implicit dag-pb codec + const originalCid = new CID('QmQPeNsJPyVWPFDVHb77w8G42Fvo15z4bG2X8D2GhfbSXc') + const key = Repo.utils.blockstore.cidToKey(originalCid) + expect(key instanceof Key).to.be.true() + expect(key.toString()).to.equal('/CIQB4655YD5GLBB7WWEUAHCO6QONU5ICBONAA5JEPBIOEIVZ5RXTIYY') + const cid = Repo.utils.blockstore.keyToCid(key) + expect(cid instanceof CID).to.be.true() + expect('bafkreia6po64b6tfqq73lckadrhpihg2oubaxgqaoushquhcek46y3zumm').to.equal(cid.toString()) + expect(cid.codec).to.equal('raw') + expect(cid.version).to.equal(1) + expect(cid.multihash).to.deep.equal(originalCid.multihash) + }) }) } diff --git a/test/interop-test.js b/test/interop-test.js index ac4d50e8..469408b0 100644 --- a/test/interop-test.js +++ b/test/interop-test.js @@ -2,7 +2,7 @@ 'use strict' const { expect } = require('./utils/chai') -const mh = require('multihashes') +const mh = require('multihashing-async').multihash const CID = require('cids') const Key = require('interface-datastore').Key diff --git a/test/repo-test.js b/test/repo-test.js index 966622c5..48336887 100644 --- a/test/repo-test.js +++ b/test/repo-test.js @@ -56,12 +56,12 @@ module.exports = (repo) => { describe('version', () => { afterEach(async () => { - await repo.version.set(7) + await repo.version.set(8) }) it('get version', async () => { const version = await repo.version.get() - expect(version).to.equal(7) + expect(version).to.equal(8) }) it('set version', async () => {