Skip to content

Commit

Permalink
feat: support multiple parallel child_process
Browse files Browse the repository at this point in the history
  • Loading branch information
AriPerkkio committed Aug 21, 2023
1 parent 9423b6f commit f61e8c1
Show file tree
Hide file tree
Showing 17 changed files with 224 additions and 134 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ jobs:
- name: Test
run: pnpm run test:ci

- name: Test Single Thread
run: pnpm run test:ci:single-thread
- name: Test No Threads
run: pnpm run test:ci:no-threads

- name: Test Vm Threads
run: pnpm run test:ci:vm-threads
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"test:all": "CI=true pnpm -r --stream run test --allowOnly",
"test:ci": "CI=true pnpm -r --stream --filter !test-fails --filter !test-browser --filter !test-esm --filter !test-browser run test --allowOnly",
"test:ci:vm-threads": "CI=true pnpm -r --stream --filter !test-fails --filter !test-single-thread --filter !test-browser --filter !test-esm --filter !test-browser run test --allowOnly --experimental-vm-threads",
"test:ci:single-thread": "CI=true pnpm -r --stream --filter !test-fails --filter !test-coverage --filter !test-watch --filter !test-bail --filter !test-esm --filter !test-browser run test --allowOnly --no-threads",
"test:ci:no-threads": "CI=true pnpm -r --stream --filter !test-fails --filter !test-coverage --filter !test-watch --filter !test-bail --filter !test-esm --filter !test-browser run test --allowOnly --no-threads",
"typecheck": "tsc --noEmit",
"typecheck:why": "tsc --noEmit --explainFiles > explainTypes.txt",
"ui:build": "vite build packages/ui",
Expand Down
2 changes: 1 addition & 1 deletion packages/ui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
"@vitest/ws-client": "workspace:*",
"@vueuse/core": "^10.2.1",
"ansi-to-html": "^0.7.2",
"birpc": "0.2.12",
"birpc": "0.2.13",
"codemirror": "^5.65.13",
"codemirror-theme-vars": "^0.1.2",
"cypress": "^12.16.0",
Expand Down
4 changes: 2 additions & 2 deletions packages/vitest/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@
"std-env": "^3.3.3",
"strip-literal": "^1.0.1",
"tinybench": "^2.5.0",
"tinypool": "^0.7.0",
"tinypool": "^0.8.0",
"vite": "^3.0.0 || ^4.0.0",
"vite-node": "workspace:*",
"why-is-node-running": "^2.2.2"
Expand All @@ -176,7 +176,7 @@
"@types/micromatch": "^4.0.2",
"@types/prompts": "^2.4.4",
"@types/sinonjs__fake-timers": "^8.1.2",
"birpc": "0.2.12",
"birpc": "0.2.13",
"chai-subset": "^1.6.0",
"cli-truncate": "^3.1.0",
"event-target-polyfill": "^0.0.3",
Expand Down
4 changes: 2 additions & 2 deletions packages/vitest/src/api/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export function setup(vitestOrWorkspace: Vitest | WorkspaceProject, server?: Vit

const wss = new WebSocketServer({ noServer: true })

const clients = new Map<WebSocket, BirpcReturn<WebSocketEvents>>()
const clients = new Map<WebSocket, BirpcReturn<WebSocketEvents, WebSocketHandlers>>()

;(server || ctx.server).httpServer?.on('upgrade', (request, socket, head) => {
if (!request.url)
Expand Down Expand Up @@ -155,7 +155,7 @@ class WebSocketReporter implements Reporter {
constructor(
public ctx: Vitest,
public wss: WebSocketServer,
public clients: Map<WebSocket, BirpcReturn<WebSocketEvents>>,
public clients: Map<WebSocket, BirpcReturn<WebSocketEvents, WebSocketHandlers>>,
) {}

onCollected(files?: File[]) {
Expand Down
252 changes: 172 additions & 80 deletions packages/vitest/src/node/pools/child.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,50 @@
import v8 from 'node:v8'
import type { ChildProcess } from 'node:child_process'
import { fork } from 'node:child_process'
import { fileURLToPath, pathToFileURL } from 'node:url'
import { cpus } from 'node:os'
import EventEmitter from 'node:events'
import { Tinypool } from 'tinypool'
import type { TinypoolChannel, Options as TinypoolOptions } from 'tinypool'
import { createBirpc } from 'birpc'
import { resolve } from 'pathe'
import type { ContextTestEnvironment, ResolvedConfig, RunnerRPC, RuntimeRPC, Vitest } from '../../types'
import type { ChildContext } from '../../types/child'
import type { PoolProcessOptions, ProcessPool, WorkspaceSpec } from '../pool'
import type { PoolProcessOptions, ProcessPool, RunWithFiles } from '../pool'
import { distDir } from '../../paths'
import { groupBy } from '../../utils/base'
import { envsOrder, groupFilesByEnv } from '../../utils/test-helpers'
import type { WorkspaceProject } from '../workspace'
import { envsOrder, groupFilesByEnv } from '../../utils/test-helpers'
import { groupBy } from '../../utils'
import { createMethodsRPC } from './rpc'

const childPath = fileURLToPath(pathToFileURL(resolve(distDir, './child.js')).href)

function setupChildProcessChannel(project: WorkspaceProject, fork: ChildProcess): void {
function createChildProcessChannel(project: WorkspaceProject) {
const emitter = new EventEmitter()
const cleanup = () => emitter.removeAllListeners()

const events = { message: 'message', response: 'response' }
const channel: TinypoolChannel = {
onMessage: callback => emitter.on(events.message, callback),
postMessage: message => emitter.emit(events.response, message),
}

const rpc = createBirpc<RunnerRPC, RuntimeRPC>(
createMethodsRPC(project),
{
eventNames: ['onCancel'],
serialize: v8.serialize,
deserialize: v => v8.deserialize(Buffer.from(v)),
post(v) {
fork.send(v)
emitter.emit(events.message, v)
},
on(fn) {
fork.on('message', fn)
emitter.on(events.response, fn)
},
},
)

project.ctx.onCancel(reason => rpc.onCancel(reason))

return { channel, cleanup }
}

function stringifyRegex(input: RegExp | string): string {
Expand All @@ -40,101 +53,180 @@ function stringifyRegex(input: RegExp | string): string {
return `$$vitest:${input.toString()}`
}

function getTestConfig(ctx: WorkspaceProject): ResolvedConfig {
const config = ctx.getSerializableConfig()
// v8 serialize does not support regex
return <ResolvedConfig>{
...config,
testNamePattern: config.testNamePattern
? stringifyRegex(config.testNamePattern)
: undefined,
export function createChildProcessPool(ctx: Vitest, { execArgv, env }: PoolProcessOptions): ProcessPool {
const threadsCount = ctx.config.watch
? Math.max(Math.floor(cpus().length / 2), 1)
: Math.max(cpus().length - 1, 1)

const maxThreads = ctx.config.maxThreads ?? threadsCount
const minThreads = ctx.config.minThreads ?? threadsCount

const options: TinypoolOptions = {
runtime: 'child_process',
filename: childPath,

maxThreads,
minThreads,

env,
execArgv,

terminateTimeout: ctx.config.teardownTimeout,
}
}

export function createChildProcessPool(ctx: Vitest, { execArgv, env }: PoolProcessOptions): ProcessPool {
const children = new Set<ChildProcess>()
if (ctx.config.isolate) {
options.isolateWorkers = true
options.concurrentTasksPerWorker = 1
}

const Sequencer = ctx.config.sequence.sequencer
const sequencer = new Sequencer(ctx)
if (ctx.config.singleThread) {
options.concurrentTasksPerWorker = 1
options.maxThreads = 1
options.minThreads = 1
}

function runFiles(project: WorkspaceProject, files: string[], environment: ContextTestEnvironment, invalidates: string[] = []) {
const config = getTestConfig(project)
ctx.state.clearFiles(project, files)
const pool = new Tinypool(options)

const runWithFiles = (name: string): RunWithFiles => {
let id = 0

async function runFiles(project: WorkspaceProject, config: ResolvedConfig, files: string[], environment: ContextTestEnvironment, invalidates: string[] = []) {
ctx.state.clearFiles(project, files)
const { channel, cleanup } = createChildProcessChannel(project)
const workerId = ++id
const data: ChildContext = {
config,
files,
invalidates,
environment,
workerId,
}
try {
await pool.run(data, { name, channel })
}
catch (error) {
// Worker got stuck and won't terminate - this may cause process to hang
if (error instanceof Error && /Failed to terminate worker/.test(error.message))
ctx.state.addProcessTimeoutCause(`Failed to terminate worker while running ${files.join(', ')}.`)

const data: ChildContext = {
command: 'start',
config,
files,
invalidates,
environment,
}
// Intentionally cancelled
else if (ctx.isCancelling && error instanceof Error && /The task has been cancelled/.test(error.message))
ctx.state.cancelFiles(files, ctx.config.root)

const child = fork(childPath, [], {
execArgv,
env,
// TODO: investigate
// serialization: 'advanced',
})
children.add(child)
setupChildProcessChannel(project, child)

return new Promise<void>((resolve, reject) => {
child.send(data, (err) => {
if (err)
reject(err)
})
child.on('close', (code) => {
if (!code)
resolve()
else
reject(new Error(`Child process exited unexpectedly with code ${code}`))
throw error
}
finally {
cleanup()
}
}

children.delete(child)
})
})
}
const Sequencer = ctx.config.sequence.sequencer
const sequencer = new Sequencer(ctx)

return async (specs, invalidates) => {
// Cancel pending tasks from pool when possible
ctx.onCancel(() => pool.cancelPendingTasks())

async function runTests(specs: WorkspaceSpec[], invalidates: string[] = []): Promise<void> {
const { shard } = ctx.config
const configs = new Map<WorkspaceProject, ResolvedConfig>()
const getConfig = (project: WorkspaceProject): ResolvedConfig => {
if (configs.has(project))
return configs.get(project)!

if (shard)
specs = await sequencer.shard(specs)
const _config = project.getSerializableConfig()

const config = {
..._config,
// v8 serialize does not support regex
testNamePattern: _config.testNamePattern
? stringifyRegex(_config.testNamePattern)
: undefined,
} as ResolvedConfig

configs.set(project, config)
return config
}

const workspaceMap = new Map<string, WorkspaceProject[]>()
for (const [project, file] of specs) {
const workspaceFiles = workspaceMap.get(file) ?? []
workspaceFiles.push(project)
workspaceMap.set(file, workspaceFiles)
}

specs = await sequencer.sort(specs)
// it's possible that project defines a file that is also defined by another project
const { shard } = ctx.config

if (shard)
specs = await sequencer.shard(specs)

specs = await sequencer.sort(specs)

// TODO: What to do about singleThread flag?
const singleThreads = specs.filter(([project]) => project.config.singleThread)
const multipleThreads = specs.filter(([project]) => !project.config.singleThread)

if (multipleThreads.length) {
const filesByEnv = await groupFilesByEnv(multipleThreads)
const files = Object.values(filesByEnv).flat()
const results: PromiseSettledResult<void>[] = []

if (ctx.config.isolate) {
results.push(...await Promise.allSettled(files.map(({ file, environment, project }) =>
runFiles(project, getConfig(project), [file], environment, invalidates))))
}
else {
// When isolation is disabled, we still need to isolate environments and workspace projects from each other.
// Tasks are still running parallel but environments are isolated between tasks.
const grouped = groupBy(files, ({ project, environment }) => project.getName() + environment.name + JSON.stringify(environment.options))

for (const group of Object.values(grouped)) {
// Push all files to pool's queue
results.push(...await Promise.allSettled(group.map(({ file, environment, project }) =>
runFiles(project, getConfig(project), [file], environment, invalidates))))

// Once all tasks are running or finished, recycle worker for isolation.
// On-going workers will run in the previous environment.
await new Promise<void>(resolve => pool.queueSize === 0 ? resolve() : pool.once('drain', resolve))
await pool.recycleWorkers()
}
}

const errors = results.filter((r): r is PromiseRejectedResult => r.status === 'rejected').map(r => r.reason)
if (errors.length > 0)
throw new AggregateError(errors, 'Errors occurred while running tests. For more information, see serialized error.')
}

const filesByEnv = await groupFilesByEnv(specs)
const envs = envsOrder.concat(
Object.keys(filesByEnv).filter(env => !envsOrder.includes(env)),
)
if (singleThreads.length) {
const filesByEnv = await groupFilesByEnv(singleThreads)
const envs = envsOrder.concat(
Object.keys(filesByEnv).filter(env => !envsOrder.includes(env)),
)

// always run environments isolated between each other
for (const env of envs) {
const files = filesByEnv[env]
for (const env of envs) {
const files = filesByEnv[env]

if (!files?.length)
continue
if (!files?.length)
continue

const filesByOptions = groupBy(files, ({ project, environment }) => project.getName() + JSON.stringify(environment.options) + environment.transformMode)
const filesByOptions = groupBy(files, ({ project, environment }) => project.getName() + JSON.stringify(environment.options))

for (const option in filesByOptions) {
const files = filesByOptions[option]
for (const files of Object.values(filesByOptions)) {
// Always run environments isolated between each other
await pool.recycleWorkers()

if (files?.length) {
const filenames = files.map(f => f.file)
await runFiles(files[0].project, filenames, files[0].environment, invalidates)
const filenames = files.map(f => f.file)
await runFiles(files[0].project, getConfig(files[0].project), filenames, files[0].environment, invalidates)
}
}
}
}
}

return {
runTests,
async close() {
children.forEach((child) => {
if (!child.killed)
child.kill()
})
children.clear()
runTests: runWithFiles('run'),
close: async () => {
await pool.destroy()
},
}
}
Loading

0 comments on commit f61e8c1

Please sign in to comment.