Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: close webrtc streams #2074

Merged
merged 11 commits into from
Oct 5, 2023
21 changes: 20 additions & 1 deletion packages/interface/src/stream-muxer/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ interface Logger {

const ERR_STREAM_RESET = 'ERR_STREAM_RESET'
const ERR_SINK_INVALID_STATE = 'ERR_SINK_INVALID_STATE'
const DEFAULT_SEND_CLOSE_WRITE_TIMEOUT = 5000

export interface AbstractStreamInit {
/**
Expand Down Expand Up @@ -70,6 +71,12 @@ export interface AbstractStreamInit {
* connection when closing the writable end of the stream. (default: 500)
*/
closeTimeout?: number

/**
* After the stream sink has closed, a limit on how long it takes to send
* a close-write message to the remote peer.
*/
sendCloseWriteTimeout?: number
}

function isPromise (res?: any): res is Promise<void> {
Expand All @@ -96,6 +103,7 @@ export abstract class AbstractStream implements Stream {
private readonly onCloseWrite?: () => void
private readonly onReset?: () => void
private readonly onAbort?: (err: Error) => void
private readonly sendCloseWriteTimeout: number

protected readonly log: Logger

Expand All @@ -115,6 +123,7 @@ export abstract class AbstractStream implements Stream {
this.timeline = {
open: Date.now()
}
this.sendCloseWriteTimeout = init.sendCloseWriteTimeout ?? DEFAULT_SEND_CLOSE_WRITE_TIMEOUT

this.onEnd = init.onEnd
this.onCloseRead = init?.onCloseRead
Expand Down Expand Up @@ -180,7 +189,9 @@ export abstract class AbstractStream implements Stream {
this.writeStatus = 'closing'

this.log.trace('send close write to remote')
await this.sendCloseWrite(options)
await this.sendCloseWrite({
signal: AbortSignal.timeout(this.sendCloseWriteTimeout)
})

this.writeStatus = 'closed'
}
Expand Down Expand Up @@ -215,6 +226,10 @@ export abstract class AbstractStream implements Stream {
this.log.trace('source and sink ended')
this.timeline.close = Date.now()

if (this.status !== 'aborted' && this.status !== 'reset') {
this.status = 'closed'
}

if (this.onEnd != null) {
this.onEnd(this.endErr)
}
Expand All @@ -241,6 +256,10 @@ export abstract class AbstractStream implements Stream {
this.log.trace('sink and source ended')
this.timeline.close = Date.now()

if (this.status !== 'aborted' && this.status !== 'reset') {
this.status = 'closed'
}

if (this.onEnd != null) {
this.onEnd(this.endErr)
}
Expand Down
6 changes: 4 additions & 2 deletions packages/transport-webrtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@
"p-event": "^6.0.0",
"p-timeout": "^6.1.2",
"protons-runtime": "^5.0.0",
"race-signal": "^1.0.0",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.6",
"wherearewe": "^2.0.1"
"uint8arrays": "^4.0.6"
},
"devDependencies": {
"@chainsafe/libp2p-yamux": "^5.0.0",
Expand All @@ -78,10 +78,12 @@
"@types/sinon": "^10.0.15",
"aegir": "^40.0.8",
"delay": "^6.0.0",
"it-drain": "^3.0.3",
"it-length": "^3.0.2",
"it-map": "^3.0.3",
"it-pair": "^2.0.6",
"libp2p": "^0.46.11",
"p-retry": "^6.1.0",
"protons": "^7.0.2",
"sinon": "^16.0.0",
"sinon-ts": "^1.0.0"
Expand Down
1 change: 0 additions & 1 deletion packages/transport-webrtc/src/muxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ export class DataChannelMuxer implements StreamMuxer {
onEnd: () => {
log.trace('stream %s %s %s onEnd', stream.direction, stream.id, stream.protocol)
drainAndClose(channel, `outbound ${stream.id} ${stream.protocol}`, this.dataChannelOptions.drainTimeout)
channel.close() // Stream initiator is responsible for closing the channel
this.streams = this.streams.filter(s => s.id !== stream.id)
this.metrics?.increment({ stream_end: true })
this.init?.onStreamEnd?.(stream)
Expand Down
7 changes: 6 additions & 1 deletion packages/transport-webrtc/src/pb/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ syntax = "proto3";

message Message {
enum Flag {
// The sender will no longer send messages on the stream.
// The sender will no longer send messages on the stream. The recipient
// should send a FIN_ACK back to the sender.
FIN = 0;

// The sender will no longer read messages on the stream. Incoming data is
Expand All @@ -12,6 +13,10 @@ message Message {
// The sender abruptly terminates the sending part of the stream. The
// receiver can discard any data that it already received on that stream.
RESET = 2;

// The sender previously received a FIN.
// Workaround for https://bugs.chromium.org/p/chromium/issues/detail?id=1484907
FIN_ACK = 3;
}

optional Flag flag = 1;
Expand Down
6 changes: 4 additions & 2 deletions packages/transport-webrtc/src/pb/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ export namespace Message {
export enum Flag {
FIN = 'FIN',
STOP_SENDING = 'STOP_SENDING',
RESET = 'RESET'
RESET = 'RESET',
FIN_ACK = 'FIN_ACK'
}

enum __FlagValues {
FIN = 0,
STOP_SENDING = 1,
RESET = 2
RESET = 2,
FIN_ACK = 3
}

export namespace Flag {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { peerIdFromString } from '@libp2p/peer-id'
import { multiaddr, type Multiaddr } from '@multiformats/multiaddr'
import { pbStream } from 'it-protobuf-stream'
import pDefer, { type DeferredPromise } from 'p-defer'
import { isNode, isElectronMain } from 'wherearewe'
import { type RTCPeerConnection, RTCSessionDescription } from '../webrtc/index.js'
import { Message } from './pb/message.js'
import { SIGNALING_PROTO_ID, splitAddr, type WebRTCTransportMetrics } from './transport.js'
Expand Down Expand Up @@ -152,24 +151,17 @@ export async function initiateConnection ({ peerConnection, signal, metrics, mul
signal
})

// TODO: workaround for https://github.com/murat-dogan/node-datachannel/issues/196
if (!isNode && !isElectronMain) {
log.trace('initiator connected, closing init channel')
log.trace('initiator connected, closing init channel')
channel.close()

// close init channel as we are connected now
channel.close()
}

const remoteAddress = parseRemoteAddress(peerConnection.currentRemoteDescription?.sdp ?? '')

log.trace('initiator connected to remote address %s', remoteAddress)

// close the signalling stream
log.trace('initiator closing signalling stream')
await messageStream.unwrap().unwrap().close({
signal
})

log.trace('initiator closed signalling stream')
const remoteAddress = parseRemoteAddress(peerConnection.currentRemoteDescription?.sdp ?? '')

log.trace('initiator connected to remote address %s', remoteAddress)

return {
remoteAddress: multiaddr(remoteAddress).encapsulate(`/p2p/${peerId.toString()}`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ export async function handleIncomingStream ({ peerConnection, stream, signal, co
})

log.trace('recipient connected, closing signaling stream')

await messageStream.unwrap().unwrap().close({
signal
})
Expand Down
67 changes: 42 additions & 25 deletions packages/transport-webrtc/src/private-to-private/util.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { logger } from '@libp2p/logger'
import { raceSignal } from 'race-signal'
import { isFirefox } from '../util.js'
import { RTCIceCandidate } from '../webrtc/index.js'
import { Message } from './pb/message.js'
Expand All @@ -12,40 +13,56 @@ export interface ReadCandidatesOptions extends AbortOptions {
}

export const readCandidatesUntilConnected = async (connectedPromise: DeferredPromise<void>, pc: RTCPeerConnection, stream: MessageStream<Message>, options: ReadCandidatesOptions): Promise<void> => {
while (true) {
const readResult = await Promise.race([
connectedPromise.promise,
stream.read(options)
])
try {
while (true) {
const readResult = await Promise.race([
connectedPromise.promise,
stream.read(options)
])

if (readResult == null) {
// connected promise resolved
break
}
if (readResult == null) {
// connected promise resolved
break
}

const message = readResult
const message = readResult

if (message.type !== Message.Type.ICE_CANDIDATE) {
throw new Error('expected only ice candidates')
}
if (message.type !== Message.Type.ICE_CANDIDATE) {
throw new Error('ICE candidate message expected')
}

// end of candidates has been signalled
if (message.data == null || message.data === '') {
log.trace('%s received end-of-candidates', options.direction)
break
}
// end of candidates has been signalled
if (message.data == null || message.data === '') {
log.trace('%s received end-of-candidates', options.direction)
break
}

log.trace('%s received new ICE candidate: %s', options.direction, message.data)
log.trace('%s received new ICE candidate: %s', options.direction, message.data)

try {
await pc.addIceCandidate(new RTCIceCandidate(JSON.parse(message.data)))
} catch (err) {
log.error('%s bad candidate received:', options.direction, err)
throw new Error('bad candidate received')
try {
await pc.addIceCandidate(new RTCIceCandidate(JSON.parse(message.data)))
} catch (err) {
log.error('%s bad candidate received:', options.direction, err)
throw new Error('bad candidate received')
}
}
} catch (err: any) {
// this happens when the remote PeerConnection's state has changed to
// connected before the final ICE candidate is sent and so they close
// the signalling stream while we are still reading from it - ignore
// the error and race the passed signal for our own connection state
// to change
if (err.code !== 'ERR_UNEXPECTED_EOF') {
log.error('error while reading ICE candidates', err)
throw err
}
}

await connectedPromise.promise
// read all available ICE candidates, wait for connection state change
await raceSignal(connectedPromise.promise, options.signal, {
errorMessage: 'Aborted before connected',
errorCode: 'ERR_ABORTED_BEFORE_CONNECTED'
})
}

export function resolveOnConnected (pc: RTCPeerConnection, promise: DeferredPromise<void>): void {
Expand Down
Loading
Loading