Skip to content

Commit

Permalink
fix: lru cache on internal pool (#392)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos authored Nov 6, 2023
1 parent 84c627e commit 125082f
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 46 deletions.
49 changes: 14 additions & 35 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"@fastify/rate-limit": "^7.6.0",
"@fastify/swagger": "^8.3.1",
"@fastify/swagger-ui": "^1.7.0",
"@isaacs/ttlcache": "^1.4.1",
"@tus/file-store": "^1.0.0-beta.1",
"@tus/s3-store": "https://gitpkg.now.sh/supabase/tus-node-server/packages/s3-store/dist?build",
"@tus/server": "https://gitpkg.now.sh/supabase/tus-node-server/packages/server/dist?build",
Expand Down Expand Up @@ -70,7 +71,6 @@
"@types/jest": "^29.2.1",
"@types/js-yaml": "^4.0.5",
"@types/jsonwebtoken": "^8.5.8",
"@types/lru-cache": "^7.10.10",
"@types/mustache": "^4.2.2",
"@types/node": "^18.14.6",
"@types/pg": "^8.6.4",
Expand Down
24 changes: 17 additions & 7 deletions src/database/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import pg, { DatabaseError } from 'pg'
import { Knex, knex } from 'knex'
import { JwtPayload } from 'jsonwebtoken'
import retry from 'async-retry'
import TTLCache from '@isaacs/ttlcache'
import { getConfig } from '../config'
import { DbActiveConnection, DbActivePool } from '../monitoring/metrics'
import { StorageBackendError } from '../storage'
Expand All @@ -11,6 +12,7 @@ import KnexTimeoutError = knex.KnexTimeoutError
pg.types.setTypeParser(20, 'text', parseInt)

const {
isMultitenant,
databaseSSLRootCert,
databaseMaxConnections,
databaseFreePoolAfterInactivity,
Expand All @@ -35,7 +37,19 @@ export interface User {
payload: { role?: string } & JwtPayload
}

export const connections = new Map<string, Knex>()
const multiTenantLRUConfig = {
ttl: 1000 * 10,
updateAgeOnGet: true,
checkAgeOnGet: true,
}
export const connections = new TTLCache<string, Knex>({
...(isMultitenant ? multiTenantLRUConfig : { max: 1, ttl: Infinity }),
dispose: async (pool) => {
if (!pool) return
await pool.destroy()
pool.client.removeAllListeners()
},
})
const searchPath = ['storage', 'public', 'extensions']

export class TenantConnection {
Expand All @@ -50,10 +64,12 @@ export class TenantConnection {

static stop() {
const promises: Promise<void>[] = []

for (const [connectionString, pool] of connections) {
promises.push(pool.destroy())
connections.delete(connectionString)
}

return Promise.allSettled(promises)
}

Expand Down Expand Up @@ -104,12 +120,6 @@ export class TenantConnection {
})

if (!isExternalPool) {
knexPool.client.pool.on('poolDestroySuccess', () => {
if (connections.get(connectionString) === knexPool) {
connections.delete(connectionString)
}
})

connections.set(connectionString, knexPool)
}

Expand Down
20 changes: 18 additions & 2 deletions src/queue/events/multipart-upload-completed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { BaseEvent, BasePayload } from './base-event'
import { Job } from 'pg-boss'
import { getConfig } from '../../config'
import { S3Backend } from '../../storage/backend'
import { isS3Error } from '../../storage'
import { isS3Error, Storage } from '../../storage'
import { logger } from '../../monitoring'

interface UploadCompleted extends BasePayload {
Expand All @@ -17,8 +17,9 @@ export class MultiPartUploadCompleted extends BaseEvent<UploadCompleted> {
static queueName = 'multipart:upload:completed'

static async handle(job: Job<UploadCompleted>) {
let storage: Storage | undefined = undefined
try {
const storage = await this.createStorage(job.data)
storage = await this.createStorage(job.data)
const version = job.data.version

const s3Key = `${job.data.tenant.ref}/${job.data.bucketName}/${job.data.objectName}/${version}`
Expand All @@ -32,6 +33,21 @@ export class MultiPartUploadCompleted extends BaseEvent<UploadCompleted> {
}
logger.error({ error: e }, 'multi part uploaded completed failed')
throw e
} finally {
if (storage) {
const tenant = storage.db.tenant()
storage.db
.destroyConnection()
.then(() => {
// no-op
})
.catch((e) => {
logger.error(
{ error: e },
`[Admin]: MultiPartUploadCompleted ${tenant.ref} - FAILED DISPOSING CONNECTION`
)
})
}
}
}
}
20 changes: 19 additions & 1 deletion src/queue/events/object-admin-delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { getConfig } from '../../config'
import { Job, WorkOptions } from 'pg-boss'
import { withOptionalVersion } from '../../storage/backend'
import { logger, logSchema } from '../../monitoring'
import { Storage } from '../../storage'

export interface ObjectDeleteEvent extends BasePayload {
name: string
Expand All @@ -23,8 +24,10 @@ export class ObjectAdminDelete extends BaseEvent<ObjectDeleteEvent> {
}

static async handle(job: Job<ObjectDeleteEvent>) {
let storage: Storage | undefined = undefined

try {
const storage = await this.createStorage(job.data)
storage = await this.createStorage(job.data)
const version = job.data.version

const s3Key = `${job.data.tenant.ref}/${job.data.bucketId}/${job.data.name}`
Expand Down Expand Up @@ -62,6 +65,21 @@ export class ObjectAdminDelete extends BaseEvent<ObjectDeleteEvent> {
`[Admin]: ObjectAdminDelete ${s3Key} - FAILED`
)
throw e
} finally {
if (storage) {
const tenant = storage.db.tenant()
storage.db
.destroyConnection()
.then(() => {
// no-op
})
.catch((e) => {
logger.error(
{ error: e },
`[Admin]: ObjectAdminDelete ${tenant.ref} - FAILED DISPOSING CONNECTION`
)
})
}
}
}
}
2 changes: 2 additions & 0 deletions src/storage/database/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,6 @@ export interface Database {

searchObjects(bucketId: string, prefix: string, options: SearchObjectOption): Promise<Obj[]>
healthcheck(): Promise<void>

destroyConnection(): Promise<void>
}
4 changes: 4 additions & 0 deletions src/storage/database/knex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,10 @@ export class StorageKnexDB implements Database {
})
}

destroyConnection() {
return this.connection.dispose()
}

protected async runQuery<T extends (db: Knex.Transaction) => Promise<any>>(
queryName: string,
fn: T
Expand Down

0 comments on commit 125082f

Please sign in to comment.