From 2bd88a8a8ec123bd220fad450645f61aea44258a Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Thu, 3 Aug 2023 16:19:25 +0100 Subject: [PATCH] feat!: close streams gracefully (#57) * feat!: close streams gracefully - Updates all libp2p related deps - Refactors `YamuxStream` class to extend `AbstractStream` similar to other stream muxers - Stream close methods are now async BREAKING CHANGE: stream close methods are now asyc, requires libp2p@0.46.x or later * chore: pr comments * chore: remove readState/writeState as they are not used any more --- package.json | 19 +- src/config.ts | 2 +- src/decode.ts | 2 +- src/index.ts | 2 +- src/muxer.ts | 99 +++++---- src/stream.ts | 358 ++++++++++----------------------- test/bench/comparison.bench.ts | 2 +- test/codec.util.ts | 2 +- test/compliance.spec.ts | 2 +- test/mplex.util.ts | 2 +- test/muxer.spec.ts | 57 +++--- test/stream.spec.ts | 88 ++++---- test/util.ts | 21 +- 13 files changed, 265 insertions(+), 391 deletions(-) diff --git a/package.json b/package.json index d2e0231..6f0b1c1 100644 --- a/package.json +++ b/package.json @@ -171,24 +171,21 @@ "docs": "aegir docs" }, "dependencies": { - "@libp2p/interface-connection": "^5.1.0", - "@libp2p/interface-stream-muxer": "^4.1.2", - "@libp2p/interfaces": "^3.3.2", - "@libp2p/logger": "^2.0.7", + "@libp2p/interface": "^0.1.0", + "@libp2p/logger": "^3.0.0", "abortable-iterator": "^5.0.1", - "any-signal": "^4.1.1", + "it-foreach": "^2.0.3", "it-pipe": "^3.0.1", - "it-pushable": "^3.1.3", + "it-pushable": "^3.2.0", "uint8arraylist": "^2.4.3" }, "devDependencies": { "@dapplion/benchmark": "^0.2.4", - "@libp2p/interface-stream-muxer-compliance-tests": "^7.0.3", - "@libp2p/mplex": "^8.0.3", - "aegir": "^39.0.7", + "@libp2p/interface-compliance-tests": "^4.0.0", + "@libp2p/mplex": "^9.0.0", + "aegir": "^40.0.1", "it-drain": "^3.0.2", "it-pair": "^2.0.6", "it-stream-types": "^2.0.1" - }, - "browser": {} + } } diff --git a/src/config.ts b/src/config.ts index 588b2f6..887e461 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,4 +1,4 @@ -import { CodeError } from '@libp2p/interfaces/errors' +import { CodeError } from '@libp2p/interface/errors' import { logger, type Logger } from '@libp2p/logger' import { ERR_INVALID_CONFIG, INITIAL_STREAM_WINDOW, MAX_STREAM_WINDOW } from './constants.js' diff --git a/src/decode.ts b/src/decode.ts index 14da242..8433f8f 100644 --- a/src/decode.ts +++ b/src/decode.ts @@ -1,4 +1,4 @@ -import { CodeError } from '@libp2p/interfaces/errors' +import { CodeError } from '@libp2p/interface/errors' import { Uint8ArrayList } from 'uint8arraylist' import { ERR_DECODE_INVALID_VERSION, ERR_DECODE_IN_PROGRESS } from './constants.js' import { type FrameHeader, FrameType, HEADER_LENGTH, YAMUX_VERSION } from './frame.js' diff --git a/src/index.ts b/src/index.ts index d11e3d4..ade3ad8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,6 @@ import { Yamux } from './muxer.js' import type { YamuxMuxerInit } from './muxer.js' -import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer' +import type { StreamMuxerFactory } from '@libp2p/interface/stream-muxer' export { GoAwayCode } from './frame.js' export function yamux (init: YamuxMuxerInit = {}): () => StreamMuxerFactory { diff --git a/src/muxer.ts b/src/muxer.ts index 118d670..9f3f6ac 100644 --- a/src/muxer.ts +++ b/src/muxer.ts @@ -1,6 +1,6 @@ -import { CodeError } from '@libp2p/interfaces/errors' +import { CodeError } from '@libp2p/interface/errors' +import { logger, type Logger } from '@libp2p/logger' import { abortableSource } from 'abortable-iterator' -import { anySignal, type ClearableSignal } from 'any-signal' import { pipe } from 'it-pipe' import { pushable, type Pushable } from 'it-pushable' import { type Config, defaultConfig, verifyConfig } from './config.js' @@ -9,13 +9,14 @@ import { Decoder } from './decode.js' import { encodeHeader } from './encode.js' import { Flag, type FrameHeader, FrameType, GoAwayCode, stringifyHeader } from './frame.js' import { StreamState, YamuxStream } from './stream.js' -import type { Stream } from '@libp2p/interface-connection' -import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface-stream-muxer' -import type { Logger } from '@libp2p/logger' +import type { AbortOptions } from '@libp2p/interface' +import type { Stream } from '@libp2p/interface/connection' +import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface/stream-muxer' import type { Sink, Source } from 'it-stream-types' import type { Uint8ArrayList } from 'uint8arraylist' const YAMUX_PROTOCOL_ID = '/yamux/1.0.0' +const CLOSE_TIMEOUT = 500 export interface YamuxMuxerInit extends StreamMuxerInit, Partial { } @@ -36,12 +37,15 @@ export class Yamux implements StreamMuxerFactory { } } +export interface CloseOptions extends AbortOptions { + reason?: GoAwayCode +} + export class YamuxMuxer implements StreamMuxer { protocol = YAMUX_PROTOCOL_ID source: Pushable sink: Sink, Promise> - private readonly _init: YamuxMuxerInit private readonly config: Config private readonly log?: Logger @@ -75,7 +79,6 @@ export class YamuxMuxer implements StreamMuxer { private readonly onStreamEnd?: (stream: Stream) => void constructor (init: YamuxMuxerInit) { - this._init = init this.client = init.direction === 'outbound' this.config = { ...defaultConfig, ...init } this.log = this.config.log @@ -89,22 +92,19 @@ export class YamuxMuxer implements StreamMuxer { this._streams = new Map() this.source = pushable({ - onEnd: (err?: Error): void => { + onEnd: (): void => { this.log?.trace('muxer source ended') - this.close(err) + + this._streams.forEach(stream => { + stream.destroy() + }) } }) this.sink = async (source: Source): Promise => { - let signal: ClearableSignal | undefined - - if (this._init.signal != null) { - signal = anySignal([this.closeController.signal, this._init.signal]) - } - source = abortableSource( source, - signal ?? this.closeController.signal, + this.closeController.signal, { returnOnAbort: true } ) @@ -133,15 +133,15 @@ export class YamuxMuxer implements StreamMuxer { } error = err as Error - } finally { - if (signal != null) { - signal.clear() - } } this.log?.trace('muxer sink ended') - this.close(error, reason) + if (error != null) { + this.abort(error, reason) + } else { + await this.close({ reason }) + } } this.numInboundStreams = 0 @@ -261,34 +261,47 @@ export class YamuxMuxer implements StreamMuxer { /** * Close the muxer - * - * @param err - * @param reason - The GoAway reason to be sent */ - close (err?: Error, reason?: GoAwayCode): void { + async close (options: CloseOptions = {}): Promise { if (this.closeController.signal.aborted) { // already closed return } - // If reason was provided, use that, otherwise use the presence of `err` to determine the reason - reason = reason ?? (err === undefined ? GoAwayCode.InternalError : GoAwayCode.NormalTermination) + const reason = options?.reason ?? GoAwayCode.NormalTermination - if (err != null) { - this.log?.error('muxer close reason=%s error=%s', GoAwayCode[reason], err) - } else { - this.log?.trace('muxer close reason=%s', GoAwayCode[reason]) + this.log?.trace('muxer close reason=%s', reason) + + options.signal = options.signal ?? AbortSignal.timeout(CLOSE_TIMEOUT) + + try { + await Promise.all( + [...this._streams.values()].map(async s => s.close(options)) + ) + + // send reason to the other side, allow the other side to close gracefully + this.sendGoAway(reason) + + this._closeMuxer() + } catch (err: any) { + this.abort(err) } + } - // If err is provided, abort all underlying streams, else close all underlying streams - if (err === undefined) { - for (const stream of this._streams.values()) { - stream.close() - } - } else { - for (const stream of this._streams.values()) { - stream.abort(err) - } + abort (err: Error, reason?: GoAwayCode): void { + if (this.closeController.signal.aborted) { + // already closed + return + } + + reason = reason ?? GoAwayCode.InternalError + + // If reason was provided, use that, otherwise use the presence of `err` to determine the reason + this.log?.error('muxer abort reason=%s error=%s', reason, err) + + // Abort all underlying streams + for (const stream of this._streams.values()) { + stream.abort(err) } // send reason to the other side, allow the other side to close gracefully @@ -319,16 +332,16 @@ export class YamuxMuxer implements StreamMuxer { } const stream = new YamuxStream({ - id, + id: id.toString(), name, state, direction, sendFrame: this.sendFrame.bind(this), - onStreamEnd: () => { + onEnd: () => { this.closeStream(id) this.onStreamEnd?.(stream) }, - log: this.log, + log: logger(`libp2p:yamux:${direction}:${id}`), config: this.config, getRTT: this.getRTT.bind(this) }) diff --git a/src/stream.ts b/src/stream.ts index 7167e78..09b9ebf 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,12 +1,10 @@ -import { CodeError } from '@libp2p/interfaces/errors' -import { abortableSource } from 'abortable-iterator' -import { pushable, type Pushable } from 'it-pushable' -import { ERR_RECV_WINDOW_EXCEEDED, ERR_STREAM_ABORT, ERR_STREAM_RESET, INITIAL_STREAM_WINDOW } from './constants.js' +import { CodeError } from '@libp2p/interface/errors' +import { AbstractStream, type AbstractStreamInit } from '@libp2p/interface/stream-muxer/stream' +import each from 'it-foreach' +import { ERR_RECV_WINDOW_EXCEEDED, ERR_STREAM_ABORT, INITIAL_STREAM_WINDOW } from './constants.js' import { Flag, type FrameHeader, FrameType, HEADER_LENGTH } from './frame.js' import type { Config } from './config.js' -import type { Stream, StreamStat } from '@libp2p/interface-connection' -import type { Logger } from '@libp2p/logger' -import type { Sink, Source } from 'it-stream-types' +import type { AbortOptions } from '@libp2p/interface' import type { Uint8ArrayList } from 'uint8arraylist' export enum StreamState { @@ -17,46 +15,20 @@ export enum StreamState { Finished, } -export enum HalfStreamState { - Open, - Closed, - Reset, -} - -export interface YamuxStreamInit { - id: number +export interface YamuxStreamInit extends AbstractStreamInit { name?: string sendFrame: (header: FrameHeader, body?: Uint8Array) => void - onStreamEnd: () => void getRTT: () => number config: Config state: StreamState - log?: Logger - direction: 'inbound' | 'outbound' } /** YamuxStream is used to represent a logical stream within a session */ -export class YamuxStream implements Stream { - id: string +export class YamuxStream extends AbstractStream { name?: string - stat: StreamStat - metadata: Record - state: StreamState - /** Used to track received FIN/RST */ - readState: HalfStreamState - /** Used to track sent FIN/RST */ - writeState: HalfStreamState - - /** Input to the read side of the stream */ - sourceInput: Pushable - /** Read side of the stream */ - source: AsyncGenerator - /** Write side of the stream */ - sink: Sink, Promise> private readonly config: Config - private readonly log?: Logger private readonly _id: number /** The number of available bytes to send */ @@ -77,207 +49,112 @@ export class YamuxStream implements Stream { private epochStart: number private readonly getRTT: () => number - /** Used to stop the sink */ - private readonly abortController: AbortController - private readonly sendFrame: (header: FrameHeader, body?: Uint8Array) => void - private readonly onStreamEnd: () => void constructor (init: YamuxStreamInit) { - this.config = init.config - this.log = init.log - this._id = init.id - this.id = String(init.id) - this.name = init.name - this.stat = { - direction: init.direction, - timeline: { - open: Date.now() + super({ + ...init, + onEnd: (err?: Error) => { + this.state = StreamState.Finished + init.onEnd?.(err) } - } - this.metadata = {} + }) + this.config = init.config + this._id = parseInt(init.id, 10) + this.name = init.name this.state = init.state - this.readState = HalfStreamState.Open - this.writeState = HalfStreamState.Open - this.sendWindowCapacity = INITIAL_STREAM_WINDOW this.recvWindow = this.config.initialStreamWindowSize this.recvWindowCapacity = this.recvWindow this.epochStart = Date.now() this.getRTT = init.getRTT - this.abortController = new AbortController() - this.sendFrame = init.sendFrame - this.onStreamEnd = init.onStreamEnd - - this.sourceInput = pushable({ - onEnd: (err?: Error) => { - if (err != null) { - this.log?.error('stream source ended id=%s', this._id, err) - } else { - this.log?.trace('stream source ended id=%s', this._id) - } - this.closeRead() - } + this.source = each(this.source, () => { + this.sendWindowUpdate() }) - - this.source = this.createSource() - - this.sink = async (source: Source): Promise => { - if (this.writeState !== HalfStreamState.Open) { - throw new Error('stream closed for writing') - } - - source = abortableSource(source, this.abortController.signal, { returnOnAbort: true }) - - try { - for await (let data of source) { - // send in chunks, waiting for window updates - while (data.length !== 0) { - // wait for the send window to refill - if (this.sendWindowCapacity === 0) await this.waitForSendWindowCapacity() - - // send as much as we can - const toSend = Math.min(this.sendWindowCapacity, this.config.maxMessageSize - HEADER_LENGTH, data.length) - this.sendData(data.subarray(0, toSend)) - this.sendWindowCapacity -= toSend - data = data.subarray(toSend) - } - } - } catch (e) { - this.log?.error('stream sink error id=%s', this._id, e) - } finally { - this.log?.trace('stream sink ended id=%s', this._id) - this.closeWrite() - } - } - } - - private async * createSource (): AsyncGenerator { - try { - for await (const val of this.sourceInput) { - this.sendWindowUpdate() - yield val - } - } catch (err) { - const errCode = (err as { code: string }).code - if (errCode !== ERR_STREAM_ABORT) { - this.log?.error('stream source error id=%s', this._id, err) - throw err - } - } - } - - close (): void { - this.log?.trace('stream close id=%s', this._id) - this.closeRead() - this.closeWrite() } - closeRead (): void { - if (this.state === StreamState.Finished) { - return - } - - if (this.readState !== HalfStreamState.Open) { - return - } - - this.log?.trace('stream close read id=%s', this._id) - - this.readState = HalfStreamState.Closed - - // close the source - this.sourceInput.end() + /** + * Send a message to the remote muxer informing them a new stream is being + * opened. + * + * This is a noop for Yamux because the first window update is sent when + * .newStream is called on the muxer which opens the stream on the remote. + */ + async sendNewStream (): Promise { - // If the both read and write are closed, finish it - if (this.writeState !== HalfStreamState.Open) { - this.finish() - } } - closeWrite (): void { - if (this.state === StreamState.Finished) { - return - } - - if (this.writeState !== HalfStreamState.Open) { - return - } + /** + * Send a data message to the remote muxer + */ + async sendData (buf: Uint8ArrayList, options: AbortOptions = {}): Promise { + buf = buf.sublist() + + // send in chunks, waiting for window updates + while (buf.byteLength !== 0) { + // wait for the send window to refill + if (this.sendWindowCapacity === 0) { + await this.waitForSendWindowCapacity(options) + } - this.log?.trace('stream close write id=%s', this._id) + // check we didn't close while waiting for send window capacity + if (this.status !== 'open') { + return + } - this.writeState = HalfStreamState.Closed + // send as much as we can + const toSend = Math.min(this.sendWindowCapacity, this.config.maxMessageSize - HEADER_LENGTH, buf.length) + const flags = this.getSendFlags() - this.sendClose() + this.sendFrame({ + type: FrameType.Data, + flag: flags, + streamID: this._id, + length: toSend + }, buf.subarray(0, toSend)) - // close the sink - this.abortController.abort() + this.sendWindowCapacity -= toSend - // If the both read and write are closed, finish it - if (this.readState !== HalfStreamState.Open) { - this.finish() + buf.consume(toSend) } } - abort (err?: Error): void { - switch (this.state) { - case StreamState.Finished: - return - case StreamState.Init: - // we haven't sent anything, so we don't need to send a reset. - break - case StreamState.SYNSent: - case StreamState.SYNReceived: - case StreamState.Established: - // at least one direction is open, we need to send a reset. - this.sendReset() - break - default: - throw new Error('unreachable') - } - - if (err != null) { - this.log?.error('stream abort id=%s error=%s', this._id, err) - } else { - this.log?.trace('stream abort id=%s', this._id) - } - - this.onReset(new CodeError(String(err) ?? 'stream aborted', ERR_STREAM_ABORT)) + /** + * Send a reset message to the remote muxer + */ + async sendReset (): Promise { + this.sendFrame({ + type: FrameType.WindowUpdate, + flag: Flag.RST, + streamID: this._id, + length: 0 + }) } - reset (): void { - if (this.state === StreamState.Finished) { - return - } - - this.log?.trace('stream reset id=%s', this._id) - - this.onReset(new CodeError('stream reset', ERR_STREAM_RESET)) + /** + * Send a message to the remote muxer, informing them no more data messages + * will be sent by this end of the stream + */ + async sendCloseWrite (): Promise { + const flags = this.getSendFlags() | Flag.FIN + this.sendFrame({ + type: FrameType.WindowUpdate, + flag: flags, + streamID: this._id, + length: 0 + }) } /** - * Called when initiating and receiving a stream reset + * Send a message to the remote muxer, informing them no more data messages + * will be read by this end of the stream */ - private onReset (err: Error): void { - // Update stream state to reset / finished - if (this.writeState === HalfStreamState.Open) { - this.writeState = HalfStreamState.Reset - } - if (this.readState === HalfStreamState.Open) { - this.readState = HalfStreamState.Reset - } - this.state = StreamState.Finished + async sendCloseRead (): Promise { - // close both the source and sink - this.sourceInput.end(err) - this.abortController.abort() - - // and finish the stream - this.finish() } /** @@ -285,25 +162,34 @@ export class YamuxStream implements Stream { * * Will throw with ERR_STREAM_ABORT if the stream gets aborted */ - async waitForSendWindowCapacity (): Promise { - if (this.abortController.signal.aborted) { - throw new CodeError('stream aborted', ERR_STREAM_ABORT) - } + async waitForSendWindowCapacity (options: AbortOptions = {}): Promise { if (this.sendWindowCapacity > 0) { return } + + let resolve: () => void let reject: (err: Error) => void const abort = (): void => { - reject(new CodeError('stream aborted', ERR_STREAM_ABORT)) - } - this.abortController.signal.addEventListener('abort', abort) - await new Promise((_resolve, _reject) => { - this.sendWindowCapacityUpdate = () => { - this.abortController.signal.removeEventListener('abort', abort) - _resolve(undefined) + if (this.status === 'open') { + reject(new CodeError('stream aborted', ERR_STREAM_ABORT)) + } else { + // the stream was closed already, ignore the failure to send + resolve() } - reject = _reject - }) + } + options.signal?.addEventListener('abort', abort) + + try { + await new Promise((_resolve, _reject) => { + this.sendWindowCapacityUpdate = () => { + _resolve() + } + reject = _reject + resolve = _resolve + }) + } finally { + options.signal?.removeEventListener('abort', abort) + } } /** @@ -336,7 +222,8 @@ export class YamuxStream implements Stream { const data = await readData() this.recvWindowCapacity -= header.length - this.sourceInput.push(data) + + this.sourcePush(data) } /** @@ -349,23 +236,13 @@ export class YamuxStream implements Stream { } } if ((flags & Flag.FIN) === Flag.FIN) { - this.closeRead() + this.remoteCloseWrite() } if ((flags & Flag.RST) === Flag.RST) { this.reset() } } - /** - * finish sets the state and triggers eventual garbage collection of the stream - */ - private finish (): void { - this.log?.trace('stream finished id=%s', this._id) - this.state = StreamState.Finished - this.stat.timeline.close = Date.now() - this.onStreamEnd() - } - /** * getSendFlags determines any flags that are appropriate * based on the current stream state. @@ -422,33 +299,4 @@ export class YamuxStream implements Stream { length: delta }) } - - private sendData (data: Uint8Array): void { - const flags = this.getSendFlags() - this.sendFrame({ - type: FrameType.Data, - flag: flags, - streamID: this._id, - length: data.length - }, data) - } - - private sendClose (): void { - const flags = this.getSendFlags() | Flag.FIN - this.sendFrame({ - type: FrameType.WindowUpdate, - flag: flags, - streamID: this._id, - length: 0 - }) - } - - private sendReset (): void { - this.sendFrame({ - type: FrameType.WindowUpdate, - flag: Flag.RST, - streamID: this._id, - length: 0 - }) - } } diff --git a/test/bench/comparison.bench.ts b/test/bench/comparison.bench.ts index 6b05bb8..c601b30 100644 --- a/test/bench/comparison.bench.ts +++ b/test/bench/comparison.bench.ts @@ -23,7 +23,7 @@ describe('comparison benchmark', () => { id: `${name} send and receive ${numMessages} ${msgSize / 1024}KB chunks`, beforeEach: () => impl({ onIncomingStream: (stream) => { - void pipe(stream, drain).then(() => { stream.close() }) + void pipe(stream, drain).then(async () => { await stream.close() }) } }), fn: async ({ client, server }) => { diff --git a/test/codec.util.ts b/test/codec.util.ts index 999ec34..088db59 100644 --- a/test/codec.util.ts +++ b/test/codec.util.ts @@ -1,4 +1,4 @@ -import { CodeError } from '@libp2p/interfaces/errors' +import { CodeError } from '@libp2p/interface/errors' import { ERR_DECODE_INVALID_VERSION } from '../src/constants.js' import { type FrameHeader, HEADER_LENGTH, YAMUX_VERSION } from '../src/frame.js' diff --git a/test/compliance.spec.ts b/test/compliance.spec.ts index 44909f1..4ec47dc 100644 --- a/test/compliance.spec.ts +++ b/test/compliance.spec.ts @@ -1,6 +1,6 @@ /* eslint-env mocha */ -import tests from '@libp2p/interface-stream-muxer-compliance-tests' +import tests from '@libp2p/interface-compliance-tests/stream-muxer' import { TestYamux } from './util.js' describe('compliance', () => { diff --git a/test/mplex.util.ts b/test/mplex.util.ts index fe5c124..12ceeed 100644 --- a/test/mplex.util.ts +++ b/test/mplex.util.ts @@ -1,7 +1,7 @@ import { mplex } from '@libp2p/mplex' import { duplexPair } from 'it-pair/duplex' import { pipe } from 'it-pipe' -import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interface-stream-muxer' +import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interface/stream-muxer' import type { Source, Transform } from 'it-stream-types' const factory = mplex()() diff --git a/test/muxer.spec.ts b/test/muxer.spec.ts index 3827e91..0171258 100644 --- a/test/muxer.spec.ts +++ b/test/muxer.spec.ts @@ -4,15 +4,28 @@ import { expect } from 'aegir/chai' import { duplexPair } from 'it-pair/duplex' import { pipe } from 'it-pipe' import { ERR_MUXER_LOCAL_CLOSED } from '../src/constants.js' -import { sleep, testClientServer, testYamuxMuxer } from './util.js' +import { sleep, testClientServer, testYamuxMuxer, type YamuxFixture } from './util.js' describe('muxer', () => { + let client: YamuxFixture + let server: YamuxFixture + + afterEach(async () => { + if (client != null) { + await client.close() + } + + if (server != null) { + await server.close() + } + }) + it('test repeated close', async () => { const client1 = testYamuxMuxer('libp2p:yamux:1', true) // inspect logs to ensure its only closed once - client1.close() - client1.close() - client1.close() + await client1.close() + await client1.close() + await client1.close() }) it('test client<->client', async () => { @@ -46,7 +59,7 @@ describe('muxer', () => { }) it('test ping', async () => { - const { client, server } = testClientServer() + ({ client, server } = testClientServer()) server.pauseRead() const clientRTT = client.ping() @@ -59,13 +72,10 @@ describe('muxer', () => { await sleep(10) server.unpauseWrite() expect(await serverRTT).to.not.equal(0) - - client.close() - server.close() }) it('test multiple simultaneous pings', async () => { - const { client } = testClientServer() + ({ client, server } = testClientServer()) client.pauseWrite() const promise = [ @@ -84,32 +94,32 @@ describe('muxer', () => { // eslint-disable-next-line @typescript-eslint/dot-notation expect(client['nextPingID']).to.equal(1) - client.close() + await client.close() }) - it('test go away', () => { - const { client } = testClientServer() - client.close() - try { + it('test go away', async () => { + ({ client, server } = testClientServer()) + await client.close() + + expect(() => { client.newStream() - expect.fail('should not be able to open a stream after close') - } catch (e) { - expect((e as { code: string }).code).to.equal(ERR_MUXER_LOCAL_CLOSED) - } + }).to.throw().with.property('code', ERR_MUXER_LOCAL_CLOSED, 'should not be able to open a stream after close') }) it('test keep alive', async () => { - const { client } = testClientServer({ enableKeepAlive: true, keepAliveInterval: 10 }) + ({ client, server } = testClientServer({ enableKeepAlive: true, keepAliveInterval: 10 })) - await sleep(100) + await sleep(1000) // eslint-disable-next-line @typescript-eslint/dot-notation expect(client['nextPingID']).to.be.gt(2) - client.close() + await client.close() + await server.close() }) it('test max inbound streams', async () => { - const { client, server } = testClientServer({ maxInboundStreams: 1 }) + ({ client, server } = testClientServer({ maxInboundStreams: 1 })) + client.newStream() client.newStream() await sleep(10) @@ -119,7 +129,8 @@ describe('muxer', () => { }) it('test max outbound streams', async () => { - const { client, server } = testClientServer({ maxOutboundStreams: 1 }) + ({ client, server } = testClientServer({ maxOutboundStreams: 1 })) + client.newStream() await sleep(10) diff --git a/test/stream.spec.ts b/test/stream.spec.ts index ee5aa23..793eb31 100644 --- a/test/stream.spec.ts +++ b/test/stream.spec.ts @@ -4,14 +4,28 @@ import { expect } from 'aegir/chai' import { pipe } from 'it-pipe' import { type Pushable, pushable } from 'it-pushable' import { defaultConfig } from '../src/config.js' -import { ERR_STREAM_RESET } from '../src/constants.js' +import { ERR_RECV_WINDOW_EXCEEDED } from '../src/constants.js' import { GoAwayCode } from '../src/frame.js' -import { HalfStreamState, StreamState } from '../src/stream.js' -import { sleep, testClientServer } from './util.js' +import { StreamState } from '../src/stream.js' +import { sleep, testClientServer, type YamuxFixture } from './util.js' +import type { Uint8ArrayList } from 'uint8arraylist' describe('stream', () => { + let client: YamuxFixture + let server: YamuxFixture + + afterEach(async () => { + if (client != null) { + await client.close() + } + + if (server != null) { + await server.close() + } + }) + it('test send data - small', async () => { - const { client, server } = testClientServer({ initialStreamWindowSize: defaultConfig.initialStreamWindowSize }) + ({ client, server } = testClientServer({ initialStreamWindowSize: defaultConfig.initialStreamWindowSize })) const { default: drain } = await import('it-drain') const p = pushable() @@ -37,7 +51,7 @@ describe('stream', () => { }) it('test send data - large', async () => { - const { client, server } = testClientServer({ initialStreamWindowSize: defaultConfig.initialStreamWindowSize }) + ({ client, server } = testClientServer({ initialStreamWindowSize: defaultConfig.initialStreamWindowSize })) const { default: drain } = await import('it-drain') const p = pushable() @@ -65,7 +79,7 @@ describe('stream', () => { }) it('test send data - large with increasing recv window size', async () => { - const { client, server } = testClientServer({ initialStreamWindowSize: defaultConfig.initialStreamWindowSize }) + ({ client, server } = testClientServer({ initialStreamWindowSize: defaultConfig.initialStreamWindowSize })) const { default: drain } = await import('it-drain') const p = pushable() @@ -97,7 +111,7 @@ describe('stream', () => { }) it('test many streams', async () => { - const { client, server } = testClientServer() + ({ client, server } = testClientServer()) for (let i = 0; i < 1000; i++) { client.newStream() } @@ -108,11 +122,11 @@ describe('stream', () => { }) it('test many streams - ping pong', async () => { - const numStreams = 10 - const { client, server } = testClientServer({ + ({ client, server } = testClientServer({ // echo on incoming streams onIncomingStream: (stream) => { void pipe(stream, stream) } - }) + })) + const numStreams = 10 const p: Array> = [] for (let i = 0; i < numStreams; i++) { @@ -131,14 +145,14 @@ describe('stream', () => { expect(client.streams.length).to.equal(numStreams) expect(server.streams.length).to.equal(numStreams) - client.close() + await client.close() }) it('test stream close', async () => { - const { client, server } = testClientServer() + ({ client, server } = testClientServer()) const c1 = client.newStream() - c1.close() + await c1.close() await sleep(5) expect(c1.state).to.equal(StreamState.Finished) @@ -149,39 +163,36 @@ describe('stream', () => { }) it('test stream close read', async () => { - const { client, server } = testClientServer() + ({ client, server } = testClientServer()) const c1 = client.newStream() - c1.closeRead() + await c1.closeRead() await sleep(5) - expect(c1.readState).to.equal(HalfStreamState.Closed) - expect(c1.writeState).to.equal(HalfStreamState.Open) - const s1 = server.streams[0] expect(s1).to.not.be.undefined() - expect(s1.readState).to.equal(HalfStreamState.Open) - expect(s1.writeState).to.equal(HalfStreamState.Open) + expect(s1.readStatus).to.equal('ready') + expect(s1.writeStatus).to.equal('ready') }) it('test stream close write', async () => { - const { client, server } = testClientServer() + ({ client, server } = testClientServer()) const c1 = client.newStream() - c1.closeWrite() + await c1.closeWrite() await sleep(5) - expect(c1.readState).to.equal(HalfStreamState.Open) - expect(c1.writeState).to.equal(HalfStreamState.Closed) + expect(c1.readStatus).to.equal('ready') + expect(c1.writeStatus).to.equal('closed') const s1 = server.streams[0] expect(s1).to.not.be.undefined() - expect(s1.readState).to.equal(HalfStreamState.Closed) - expect(s1.writeState).to.equal(HalfStreamState.Open) + expect(s1.readStatus).to.equal('closed') + expect(s1.writeStatus).to.equal('ready') }) it('test window overflow', async () => { - const { client, server } = testClientServer({ maxMessageSize: defaultConfig.initialStreamWindowSize, initialStreamWindowSize: defaultConfig.initialStreamWindowSize }) + ({ client, server } = testClientServer({ maxMessageSize: defaultConfig.initialStreamWindowSize, initialStreamWindowSize: defaultConfig.initialStreamWindowSize })) const { default: drain } = await import('it-drain') const p = pushable() @@ -191,12 +202,10 @@ describe('stream', () => { const s1 = server.streams[0] const sendPipe = pipe(p, c1) - // eslint-disable-next-line @typescript-eslint/dot-notation - const c1SendData = c1['sendData'].bind(c1) - // eslint-disable-next-line @typescript-eslint/dot-notation - ;(c1 as any)['sendData'] = (data: Uint8Array): void => { - // eslint-disable-next-line @typescript-eslint/dot-notation - c1SendData(data) + const c1SendData = c1.sendData.bind(c1) + + c1.sendData = async (data: Uint8ArrayList): Promise => { + await c1SendData(data) // eslint-disable-next-line @typescript-eslint/dot-notation c1['sendWindowCapacity'] = defaultConfig.initialStreamWindowSize * 10 } @@ -211,16 +220,15 @@ describe('stream', () => { try { await Promise.all([sendPipe, recvPipe]) } catch (e) { - expect((e as { code: string }).code).to.equal(ERR_STREAM_RESET) + expect((e as { code: string }).code).to.equal(ERR_RECV_WINDOW_EXCEEDED) } - // eslint-disable-next-line @typescript-eslint/dot-notation - expect(client['remoteGoAway']).to.equal(GoAwayCode.ProtocolError) - // eslint-disable-next-line @typescript-eslint/dot-notation - expect(server['localGoAway']).to.equal(GoAwayCode.ProtocolError) + + expect(client).to.have.property('remoteGoAway', GoAwayCode.ProtocolError) + expect(server).to.have.property('localGoAway', GoAwayCode.ProtocolError) }) it('test stream sink error', async () => { - const { client, server } = testClientServer() + ({ client, server } = testClientServer()) // don't let the server respond server.pauseRead() @@ -237,7 +245,7 @@ describe('stream', () => { await sleep(10) // the client should close gracefully even though it was waiting to send more data - client.close() + await client.close() p.end() await sendPipe diff --git a/test/util.ts b/test/util.ts index e71b14b..3c28a7f 100644 --- a/test/util.ts +++ b/test/util.ts @@ -68,19 +68,16 @@ export function pauseableTransform (): { transform: Transform, Asy return { transform, pause, unpause } } +export interface YamuxFixture extends YamuxMuxer { + pauseRead: () => void + unpauseRead: () => void + pauseWrite: () => void + unpauseWrite: () => void +} + export function testClientServer (conf: YamuxMuxerInit = {}): { - client: YamuxMuxer & { - pauseRead: () => void - unpauseRead: () => void - pauseWrite: () => void - unpauseWrite: () => void - } - server: YamuxMuxer & { - pauseRead: () => void - unpauseRead: () => void - pauseWrite: () => void - unpauseWrite: () => void - } + client: YamuxFixture + server: YamuxFixture } { const pair = duplexPair() const client = testYamuxMuxer('libp2p:yamux:client', true, conf)