Skip to content

Commit

Permalink
change to parentSide/workerSide
Browse files Browse the repository at this point in the history
  • Loading branch information
rjawesome committed Jul 29, 2024
1 parent 2c6fa42 commit f298a3d
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions src/controllers/threading/threadHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,20 @@ async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: stri
return new Promise((resolve, reject) => {
let workerThreadID: string;
const abortController = new AbortController();
const { port1: toWorker, port2: fromWorker } = new MessageChannel();
const { port1: workerSide, port2: parentSide } = new MessageChannel();

// get otel context

const otelData: Partial<{ traceparent: string; tracestate: string }> = {};
propagation.inject(context.active(), otelData);
const { traceparent, tracestate } = otelData;

const taskData: InnerTaskData = { req: taskInfo, route, traceparent, tracestate, port: toWorker };
const taskData: InnerTaskData = { req: taskInfo, route, traceparent, tracestate, port: workerSide };
taskData.req.data.options = {...taskData.req.data.options, metakg: global.metakg?.ops, smartapi: global.smartapi} as QueryHandlerOptions;

// Propagate data between task runner and bull job
if (job) taskData.job = { jobId: job.id, queueName: job.queue.name };
const task = pool.run(taskData, { signal: abortController.signal, transferList: [toWorker] });
const task = pool.run(taskData, { signal: abortController.signal, transferList: [workerSide] });
if (job) {
void job.update({ ...job.data, abortController });
}
Expand Down Expand Up @@ -157,7 +157,7 @@ async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: stri
} = {};
const timeout = parseInt(process.env.REQUEST_TIMEOUT ?? (60 * 5).toString()) * 1000;

fromWorker.on("message", async (msg: ThreadMessage) => {
parentSide.on("message", async (msg: ThreadMessage) => {
switch (msg.type) {
default:
debug(`WARNING: received untyped message from thread {msg.threadId}`);
Expand Down Expand Up @@ -198,7 +198,7 @@ async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: stri
await Promise.all(queries.map(async query => await Subquery.unfreeze(query))),
options,
({ hash, records, logs, apiUnavailable }) => {
fromWorker.postMessage({
parentSide.postMessage({
threadId: 0,
type: "subQueryResult",
value: { hash, records, logs, apiUnavailable },
Expand Down Expand Up @@ -359,20 +359,20 @@ export async function runBullJob(job: BullJob, route: string, useAsync = true) {
}

export function taskResponse<T>(response: T, status: number = undefined): T {
if (global.parentPort) {
global.parentPort.postMessage({ threadId, type: 'result', value: response, status } satisfies ThreadMessage);
if (global.workerSide) {
global.workerSide.postMessage({ threadId, type: 'result', value: response, status } satisfies ThreadMessage);
return undefined;
} else {
return response;
}
}

export function taskError(error: Error): void {
if (global.parentPort) {
if (global.workerSide) {
if (ErrorHandler.shouldHandleError(error)) {
Telemetry.captureException(error);
}
global.parentPort.postMessage({ threadId, type: 'error', value: error } satisfies ThreadMessage);
global.workerSide.postMessage({ threadId, type: 'error', value: error } satisfies ThreadMessage);
return undefined;
} else {
throw error;
Expand Down

0 comments on commit f298a3d

Please sign in to comment.