Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TTL cache on internal pool #392

Merged
merged 1 commit into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 14 additions & 35 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"@fastify/rate-limit": "^7.6.0",
"@fastify/swagger": "^8.3.1",
"@fastify/swagger-ui": "^1.7.0",
"@isaacs/ttlcache": "^1.4.1",
"@tus/file-store": "^1.0.0-beta.1",
"@tus/s3-store": "https://gitpkg.now.sh/supabase/tus-node-server/packages/s3-store/dist?build",
"@tus/server": "https://gitpkg.now.sh/supabase/tus-node-server/packages/server/dist?build",
Expand Down Expand Up @@ -70,7 +71,6 @@
"@types/jest": "^29.2.1",
"@types/js-yaml": "^4.0.5",
"@types/jsonwebtoken": "^8.5.8",
"@types/lru-cache": "^7.10.10",
"@types/mustache": "^4.2.2",
"@types/node": "^18.14.6",
"@types/pg": "^8.6.4",
Expand Down
24 changes: 17 additions & 7 deletions src/database/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import pg, { DatabaseError } from 'pg'
import { Knex, knex } from 'knex'
import { JwtPayload } from 'jsonwebtoken'
import retry from 'async-retry'
import TTLCache from '@isaacs/ttlcache'
import { getConfig } from '../config'
import { DbActiveConnection, DbActivePool } from '../monitoring/metrics'
import { StorageBackendError } from '../storage'
Expand All @@ -11,6 +12,7 @@ import KnexTimeoutError = knex.KnexTimeoutError
pg.types.setTypeParser(20, 'text', parseInt)

const {
isMultitenant,
databaseSSLRootCert,
databaseMaxConnections,
databaseFreePoolAfterInactivity,
Expand All @@ -35,7 +37,19 @@ export interface User {
payload: { role?: string } & JwtPayload
}

export const connections = new Map<string, Knex>()
const multiTenantLRUConfig = {
ttl: 1000 * 10,
updateAgeOnGet: true,
checkAgeOnGet: true,
}
export const connections = new TTLCache<string, Knex>({
...(isMultitenant ? multiTenantLRUConfig : { max: 1, ttl: Infinity }),
dispose: async (pool) => {
if (!pool) return
await pool.destroy()
pool.client.removeAllListeners()
},
})
const searchPath = ['storage', 'public', 'extensions']

export class TenantConnection {
Expand All @@ -50,10 +64,12 @@ export class TenantConnection {

static stop() {
const promises: Promise<void>[] = []

for (const [connectionString, pool] of connections) {
promises.push(pool.destroy())
connections.delete(connectionString)
}

return Promise.allSettled(promises)
}

Expand Down Expand Up @@ -104,12 +120,6 @@ export class TenantConnection {
})

if (!isExternalPool) {
knexPool.client.pool.on('poolDestroySuccess', () => {
if (connections.get(connectionString) === knexPool) {
connections.delete(connectionString)
}
})

connections.set(connectionString, knexPool)
}

Expand Down
20 changes: 18 additions & 2 deletions src/queue/events/multipart-upload-completed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { BaseEvent, BasePayload } from './base-event'
import { Job } from 'pg-boss'
import { getConfig } from '../../config'
import { S3Backend } from '../../storage/backend'
import { isS3Error } from '../../storage'
import { isS3Error, Storage } from '../../storage'
import { logger } from '../../monitoring'

interface UploadCompleted extends BasePayload {
Expand All @@ -17,8 +17,9 @@ export class MultiPartUploadCompleted extends BaseEvent<UploadCompleted> {
static queueName = 'multipart:upload:completed'

static async handle(job: Job<UploadCompleted>) {
let storage: Storage | undefined = undefined
try {
const storage = await this.createStorage(job.data)
storage = await this.createStorage(job.data)
const version = job.data.version

const s3Key = `${job.data.tenant.ref}/${job.data.bucketName}/${job.data.objectName}/${version}`
Expand All @@ -32,6 +33,21 @@ export class MultiPartUploadCompleted extends BaseEvent<UploadCompleted> {
}
logger.error({ error: e }, 'multi part uploaded completed failed')
throw e
} finally {
if (storage) {
const tenant = storage.db.tenant()
storage.db
.destroyConnection()
.then(() => {
// no-op
})
.catch((e) => {
logger.error(
{ error: e },
`[Admin]: MultiPartUploadCompleted ${tenant.ref} - FAILED DISPOSING CONNECTION`
)
})
}
}
}
}
20 changes: 19 additions & 1 deletion src/queue/events/object-admin-delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { getConfig } from '../../config'
import { Job, WorkOptions } from 'pg-boss'
import { withOptionalVersion } from '../../storage/backend'
import { logger, logSchema } from '../../monitoring'
import { Storage } from '../../storage'

export interface ObjectDeleteEvent extends BasePayload {
name: string
Expand All @@ -23,8 +24,10 @@ export class ObjectAdminDelete extends BaseEvent<ObjectDeleteEvent> {
}

static async handle(job: Job<ObjectDeleteEvent>) {
let storage: Storage | undefined = undefined

try {
const storage = await this.createStorage(job.data)
storage = await this.createStorage(job.data)
const version = job.data.version

const s3Key = `${job.data.tenant.ref}/${job.data.bucketId}/${job.data.name}`
Expand Down Expand Up @@ -62,6 +65,21 @@ export class ObjectAdminDelete extends BaseEvent<ObjectDeleteEvent> {
`[Admin]: ObjectAdminDelete ${s3Key} - FAILED`
)
throw e
} finally {
if (storage) {
const tenant = storage.db.tenant()
storage.db
.destroyConnection()
.then(() => {
// no-op
})
.catch((e) => {
logger.error(
{ error: e },
`[Admin]: ObjectAdminDelete ${tenant.ref} - FAILED DISPOSING CONNECTION`
)
})
}
}
}
}
2 changes: 2 additions & 0 deletions src/storage/database/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,6 @@ export interface Database {

searchObjects(bucketId: string, prefix: string, options: SearchObjectOption): Promise<Obj[]>
healthcheck(): Promise<void>

destroyConnection(): Promise<void>
}
4 changes: 4 additions & 0 deletions src/storage/database/knex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,10 @@ export class StorageKnexDB implements Database {
})
}

destroyConnection() {
return this.connection.dispose()
}

protected async runQuery<T extends (db: Knex.Transaction) => Promise<any>>(
queryName: string,
fn: T
Expand Down
Loading