Skip to content

Commit

Permalink
fix 400 error tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rjawesome committed Aug 12, 2024
1 parent 81719ee commit 64cbbd9
Showing 1 changed file with 36 additions and 26 deletions.
62 changes: 36 additions & 26 deletions src/controllers/threading/threadHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,33 +241,42 @@ export async function runTask(req: Request, res: Response, route: string, useBul
if (process.env.USE_THREADING === "false") {
// Set up "inter thread messaging"
const { port1: workerSide, port2: parentSide } = new MessageChannel();
parentSide.on("message", async (msg: ThreadMessage) => {
switch (msg.type) {
case "subqueryRequest":
const { queries, options } = msg.value as {
queries: FrozenSubquery[];
options: QueryHandlerOptions;
};
debug(`Main thread receives ${queries.length} subqueries from worker.`);
subqueryRelay.subscribe(
await Promise.all(queries.map(async query => await Subquery.unfreeze(query))),
options,
({ hash, records, logs, apiUnavailable }) => {
parentSide.postMessage({
threadId: 0,
type: "subQueryResult",
value: { hash, records, logs, apiUnavailable },
} satisfies ThreadMessage);
},
);
break;
}
});
global.workerSide = workerSide;
// Threading disabled, just use the provided function in main event loop
const response = (await tasks[route](taskInfo)) as TrapiResponse;
parentSide.close();
return response;

// start task
tasks[route](taskInfo);
return new Promise((resolve, reject) => {
parentSide.on("message", async (msg: ThreadMessage) => {
switch (msg.type) {
case "subqueryRequest":
const { queries, options } = msg.value as {
queries: FrozenSubquery[];
options: QueryHandlerOptions;
};
debug(`Main thread receives ${queries.length} subqueries from worker.`);
subqueryRelay.subscribe(
await Promise.all(queries.map(async query => await Subquery.unfreeze(query))),
options,
({ hash, records, logs, apiUnavailable }) => {
parentSide.postMessage({
threadId: 0,
type: "subQueryResult",
value: { hash, records, logs, apiUnavailable },
} satisfies ThreadMessage);
},
);
break;
case "result":
resolve(msg.value as TrapiResponse);
parentSide.close();
break;
case "error":
reject(msg.value as Error);
parentSide.close();
break;
}
});
});
} else if (!(queryQueue && useBullSync)) {
// Redis unavailable or query not to sync queue such as asyncquery_status
const response = await queueTaskToWorkers(
Expand Down Expand Up @@ -361,6 +370,7 @@ export async function runBullJob(job: BullJob, route: string, useAsync = true) {
export function taskResponse<T>(response: T, status: number = undefined): T {
if (global.workerSide) {
global.workerSide.postMessage({ threadId, type: 'result', value: response, status } satisfies ThreadMessage);
return undefined;
}
return response;
}
Expand Down

0 comments on commit 64cbbd9

Please sign in to comment.