Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat: add lazy select (#18)
Browse files Browse the repository at this point in the history
Co-authored-by: Alex Potsides <alex@achingbrain.net>
  • Loading branch information
alanshaw and achingbrain committed Oct 12, 2022
1 parent cfb887b commit d3bff7c
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 2 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
"it-first": "^1.0.6",
"it-handshake": "^4.0.1",
"it-length-prefixed": "^8.0.2",
"it-merge": "^1.0.4",
"it-pipe": "^2.0.3",
"it-pushable": "^3.0.0",
"it-reader": "^6.0.1",
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ export interface MultistreamSelectInit extends AbortOptions {
writeBytes?: boolean
}

export { select } from './select.js'
export { select, lazySelect } from './select.js'
export { handle } from './handle.js'
58 changes: 57 additions & 1 deletion src/select.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import { handshake } from 'it-handshake'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { PROTOCOL_ID } from './index.js'
import type { Duplex } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'
import { Uint8ArrayList } from 'uint8arraylist'
import { pushable } from 'it-pushable'
import merge from 'it-merge'
import { reader } from 'it-reader'
import type { ByteArrayInit, ByteListInit, MultistreamSelectInit, ProtocolStream } from './index.js'

const log = logger('libp2p:mss:select')
Expand Down Expand Up @@ -58,3 +61,56 @@ export async function select (stream: Duplex<any>, protocols: string | string[],
rest()
throw errCode(new Error('protocol selection failed'), 'ERR_UNSUPPORTED_PROTOCOL')
}

/**
* Lazily negotiates a protocol.
*
* It *does not* block writes waiting for the other end to respond. Instead, it
* simply assumes the negotiation went successfully and starts writing data.
*
* Use when it is known that the receiver supports the desired protocol.
*/
export function lazySelect (stream: Duplex<Uint8Array>, protocol: string): ProtocolStream<Uint8Array>
export function lazySelect (stream: Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array>, protocol: string): ProtocolStream<Uint8ArrayList, Uint8ArrayList | Uint8Array>
export function lazySelect (stream: Duplex<any>, protocol: string): ProtocolStream<any> {
// This is a signal to write the multistream headers if the consumer tries to
// read from the source
const negotiateTrigger = pushable()
let negotiated = false
return {
stream: {
sink: async source => await stream.sink((async function * () {
let first = true
for await (const chunk of merge(source, negotiateTrigger)) {
if (first) {
first = false
negotiated = true
negotiateTrigger.end()
const p1 = uint8ArrayFromString(PROTOCOL_ID)
const p2 = uint8ArrayFromString(protocol)
const list = new Uint8ArrayList(multistream.encode(p1), multistream.encode(p2))
if (chunk.length > 0) list.append(chunk)
yield * list
} else {
yield chunk
}
}
})()),
source: (async function * () {
if (!negotiated) negotiateTrigger.push(new Uint8Array())
const byteReader = reader(stream.source)
let response = await multistream.readString(byteReader)
if (response === PROTOCOL_ID) {
response = await multistream.readString(byteReader)
}
if (response !== protocol) {
throw errCode(new Error('protocol selection failed'), 'ERR_UNSUPPORTED_PROTOCOL')
}
for await (const chunk of byteReader) {
yield * chunk
}
})()
},
protocol
}
}
15 changes: 15 additions & 0 deletions test/dialer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,19 @@ describe('Dialer', () => {
await expect(mss.select(duplex, protocol)).to.eventually.be.rejected().with.property('code', 'ERR_UNSUPPORTED_PROTOCOL')
})
})

describe('dialer.lazySelect', () => {
it('should lazily select a single protocol', async () => {
const protocol = '/echo/1.0.0'
const duplex = pair()

const selection = mss.lazySelect(duplex, protocol)
expect(selection.protocol).to.equal(protocol)

// Ensure stream is usable after selection
const input = [randomBytes(10), randomBytes(64), randomBytes(3)]
const output = await pipe(input, selection.stream, async (source) => await all(source))
expect(new Uint8ArrayList(...output).slice()).to.eql(new Uint8ArrayList(...input).slice())
})
})
})
43 changes: 43 additions & 0 deletions test/integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,47 @@ describe('Dialer and Listener integration', () => {
])
expect(new Uint8ArrayList(...output[0]).slice()).to.eql(new Uint8ArrayList(...input).slice())
})

it('should handle and lazySelect', async () => {
const protocol = '/echo/1.0.0'
const pair = duplexPair()

const dialerSelection = mss.lazySelect(pair[0], protocol)
expect(dialerSelection.protocol).to.equal(protocol)

// Ensure stream is usable after selection
const input = [new Uint8ArrayList(randomBytes(10), randomBytes(64), randomBytes(3))]
// Since the stream is lazy, we need to write to it before handling
const dialerOutPromise = pipe(input, dialerSelection.stream, async source => await all(source))

const listenerSelection = await mss.handle(pair[1], protocol)
expect(listenerSelection.protocol).to.equal(protocol)

await pipe(listenerSelection.stream, listenerSelection.stream)

const dialerOut = await dialerOutPromise
expect(new Uint8ArrayList(...dialerOut).slice()).to.eql(new Uint8ArrayList(...input).slice())
})

it('should abort an unhandled lazySelect', async () => {
const protocol = '/echo/1.0.0'
const pair = duplexPair()

const dialerSelection = mss.lazySelect(pair[0], protocol)
expect(dialerSelection.protocol).to.equal(protocol)

// Ensure stream is usable after selection
const input = [new Uint8ArrayList(randomBytes(10), randomBytes(64), randomBytes(3))]
// Since the stream is lazy, we need to write to it before handling
const dialerResultPromise = pipe(input, dialerSelection.stream, async source => await all(source))

// The error message from this varies depending on how much data got
// written when the dialer receives the `na` response and closes the
// stream, so we just assert that this rejects.
await expect(mss.handle(pair[1], '/unhandled/1.0.0')).to.eventually.be.rejected()

// Dialer should fail to negotiate the single protocol
await expect(dialerResultPromise).to.eventually.be.rejected()
.with.property('code', 'ERR_UNSUPPORTED_PROTOCOL')
})
})

0 comments on commit d3bff7c

Please sign in to comment.