Skip to content

Commit

Permalink
Merge pull request #1 from fgiova/feature/sets-for-piscina
Browse files Browse the repository at this point in the history
feat!: Refactor implementing options for threads control
  • Loading branch information
fgiova committed Oct 29, 2023
2 parents 971a199 + 7ce10b9 commit 2f0b17a
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 37 deletions.
21 changes: 17 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,41 @@ npm i @fgiova/aws-signature

### Usage
```js
const { signRequest } = require("@fgiova/aws-signature");
const { Signer } = require("@fgiova/aws-signature");

const signature = signRequest({
const signer = new Signer();
const signature = signer.request({
method: "POST",
path: "/",
headers: {
host: "foo.us-bar-1.amazonaws.com",
},
body: "Action=SendMessage&MessageBody=test&Version=2012-11-05",
}, "sqs");

// To destroy the thread pool
signer.destroy();
```

### API
```js
signRequest(request: Request, service: string, region?: string, date?: Date): string
Signer(options?: SignerOptions)
Signer.request(request: Request, service: string, region?: string, date?: Date): string
Signer.destroy(): Promise<void>
```
#### Environment variables
* `AWS_ACCESS_KEY_ID` - The AWS access key ID to sign the request with.
* `AWS_SECRET_ACCESS_KEY` - The AWS secret access key to sign the request with.
* `AWS_REGION` - The AWS region to sign the request for

#### Parameters
* `SignerOptions` - The options for the signer. It can have the following properties:
* `minThreads` - Sets the minimum number of threads that are always running for this thread pool. The default is based on the number of available CPUs.
* `maxThreads` - Sets the maximum number of threads that can be running for this thread pool. The default is based on the number of available CPUs.
* `idleTimeout` - A timeout in milliseconds that specifies how long a Worker is allowed to be idle, i.e. not handling any tasks, before it is shut down. By default, this is immediate.
* `maxQueueSize` - The maximum number of tasks that may be scheduled to run, but not yet running due to lack of available threads, at a given time. By default, there is no limit. The special value 'auto' may be used to have Piscina calculate the maximum as the square of maxThreads.
* `concurrentTasksPerWorker` - Specifies how many tasks can share a single Worker thread simultaneously. The default is 1. This generally only makes sense to specify if there is some kind of asynchronous component to the task. Keep in mind that Worker threads are generally not built for handling I/O in parallel.
* `resourceLimits` - See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options)
* `request` - The request to sign. It can be a string, a buffer, or an object with the following properties:
* `method` - The HTTP method of the request.
* `path` - The path of the request.
Expand All @@ -49,7 +62,7 @@ signRequest(request: Request, service: string, region?: string, date?: Date): st
* `date` - The date to sign the request for. If not specified, the date will be now

#### Returns
The signature of the request, as a string.
`Signer.request` - The signature of the request, as a string.

## License
Licensed under [MIT](./LICENSE).
86 changes: 58 additions & 28 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ import Piscina from "piscina";
import {HttpRequest} from "./aws/utils";
import {LRUCache} from "lru-cache";
import path from "node:path";
const isTS = path.resolve(__filename).endsWith(".ts");
import {ResourceLimits} from "worker_threads";
import {cpus} from "os";
export type {HttpRequest} from "./aws/utils";

/* c8 ignore start */
const isTS = path.resolve(__filename).endsWith(".ts");
const runEnv = {
ext: isTS ? "ts" : "js",
execArgv: isTS ? ["-r", "ts-node/register"] : undefined
Expand All @@ -16,34 +19,61 @@ const keyCache = new LRUCache<string, Buffer>({
ttl: 1000 * 60 * 60 * 24
});

const signRequestWorker = new Piscina({
filename: path.resolve(__dirname, `./sign_worker.${runEnv.ext}`),
name: "signRequest",
execArgv: runEnv.execArgv
});

const keyGeneratorWorker = new Piscina({
filename: path.resolve(__dirname, `./sign_worker.${runEnv.ext}`),
name: "generateKey",
execArgv: runEnv.execArgv
});

function millstoNextDay() {
const tomorrow = new Date();
tomorrow.setDate(tomorrow.getDate() + 1);
tomorrow.setHours(0, 0, 0, 0);
return Math.abs( tomorrow.getTime() - new Date().getTime() );
export type SignerOptions = {
minThreads?: number;
maxThreads?: number;
idleTimeout?: number;
maxQueue?: number | 'auto';
concurrentTasksPerWorker?: number;
resourceLimits?: ResourceLimits;
}

export async function signRequest (request: HttpRequest, service: string, region?: string, date = new Date()) {
const keyId = `${service}-${region}`;
let key = keyCache.get(keyId);
if(!key){
key = await keyGeneratorWorker.run({service, region, date}) as Buffer;
keyCache.set(keyId, key, {
ttl: millstoNextDay()
export class Signer {
private readonly worker: Piscina;
cpuCount : number = (() => {
try {
return cpus().length;
} catch {
/* istanbul ignore next */
return 1;
}
})()
constructor (options: SignerOptions= {}) {
const { minThreads, maxThreads, idleTimeout, maxQueue, concurrentTasksPerWorker, resourceLimits } = options;

this.worker = new Piscina({
filename: path.resolve(__dirname, `./sign_worker.${runEnv.ext}`),
execArgv: runEnv.execArgv,
name: "generateKey",
minThreads: minThreads ?? Math.max(this.cpuCount / 2, 1),
maxThreads: maxThreads ?? this.cpuCount * 1.5,
idleTimeout,
maxQueue,
concurrentTasksPerWorker,
resourceLimits
});
}
return await signRequestWorker.run({request, service, region, key, date}) as HttpRequest;
}
export type {HttpRequest} from "./aws/utils";

private millsToNextDay() {
const tomorrow = new Date();
tomorrow.setDate(tomorrow.getDate() + 1);
tomorrow.setHours(0, 0, 0, 0);
return Math.abs( tomorrow.getTime() - new Date().getTime() );
}

async request (request: HttpRequest, service: string, region?: string, date = new Date()) {
const keyId = `${service}-${region}`;
let key = keyCache.get(keyId);
if(!key){
key = await this.worker.run({service, region, date}) as Buffer;
keyCache.set(keyId, key, {
ttl: this.millsToNextDay()
});
}
return await this.worker.run({request, service, region, key, date}, {name:"signRequest"}) as HttpRequest;
}

async destroy () {
return this.worker.destroy();
}
}
35 changes: 30 additions & 5 deletions test/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { test } from "tap";
process.env.AWS_ACCESS_KEY_ID = "foo";
process.env.AWS_SECRET_ACCESS_KEY = "bar";
import {signRequest} from "../src/";
import {Signer} from "../src/";

test("Sign Request Worker", async (t) => {
t.beforeEach(async (t) => {
Expand All @@ -19,15 +19,22 @@ test("Sign Request Worker", async (t) => {
});

await t.test("should sign request without body", async (t) => {
const date = new Date("2000-01-01T00:00:00.000Z");
const request = await signRequest(t.context.requestData,"foo", "us-bar-1", date);
t.same(request.headers["Authorization"], "AWS4-HMAC-SHA256 Credential=foo/20000101/us-bar-1/foo/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=1e3b24fcfd7655c0c245d99ba7b6b5ca6174eab903ebfbda09ce457af062ad30");
try {
const date = new Date("2000-01-01T00:00:00.000Z");
const signer = new Signer();
const request = await signer.request(t.context.requestData, "foo", "us-bar-1", date);
t.same(request.headers["Authorization"], "AWS4-HMAC-SHA256 Credential=foo/20000101/us-bar-1/foo/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=1e3b24fcfd7655c0c245d99ba7b6b5ca6174eab903ebfbda09ce457af062ad30");

} catch (error) {
console.log(error);
}
});


await t.test("should sign requests with string bodies", async (t) => {
const date = new Date("2000-01-01T00:00:00.000Z");
const request = await signRequest({
const signer = new Signer();
const request = await signer.request({
...t.context.requestData,
body: "It was the best of times, it was the worst of times",
},
Expand All @@ -38,5 +45,23 @@ test("Sign Request Worker", async (t) => {
t.same(request.headers["Authorization"], "AWS4-HMAC-SHA256 Credential=foo/20000101/us-bar-1/foo/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=cf22a0befff359388f136b158f0b1b43db7b18d2ca65ce4112bc88a16815c4b6");
});

await t.test("should sign requests with string bodies without cache", async (t) => {
const date = new Date("2000-01-01T00:00:00.000Z");
const signer = new Signer();
const request = await signer.request({
...t.context.requestData,
body: "It was the best of times, it was the worst of times",
},
"foo-test-2",
"us-bar-1",
date);

t.same(request.headers["Authorization"], "AWS4-HMAC-SHA256 Credential=foo/20000101/us-bar-1/foo-test-2/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=1becd5fd94ce1e82b68d98fa70a9c99b4b4668eec909cf2f41f482426ac44970");
});

await t.test("destroy workers", async (t) => {
const signer = new Signer();
await t.resolves(signer.destroy());
});

});

0 comments on commit 2f0b17a

Please sign in to comment.