Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fetch(stream) add stream support for compressed and uncompressed data #4127

Merged
merged 42 commits into from
Aug 22, 2023

Conversation

cirospaciari
Copy link
Collaborator

@cirospaciari cirospaciari commented Aug 12, 2023

What does this PR do?

  • Documentation or TypeScript types (it's okay to leave the rest blank in this case)
  • Code changes

How did you verify your code works?

I wrote automated tests + existing tests

@github-actions
Copy link
Contributor

github-actions bot commented Aug 12, 2023

zig fmt errors have been resolved. Thank you.

#c07180a368f1ac203a73157cf3dfbdef341a631a
zig v0.11.0-dev.4006+bf827d0b5

misctools/fetch.zig Outdated Show resolved Hide resolved
src/deps/uws.zig Outdated Show resolved Hide resolved
@cirospaciari cirospaciari changed the title WIP: fetch(stream) streams non compressed data in 64kb chunks (at least) WIP: fetch(stream) add stream support for compressed and uncompressed data Aug 17, 2023
@cirospaciari cirospaciari marked this pull request as ready for review August 17, 2023 23:33
@github-actions
Copy link
Contributor

github-actions bot commented Aug 18, 2023

@cirospaciari 1 files with test failures on linux-x64-baseline:

  • test/js/third_party/fsevents/fsevents.test.ts

View test output

#c07180a368f1ac203a73157cf3dfbdef341a631a

@github-actions
Copy link
Contributor

github-actions bot commented Aug 18, 2023

@cirospaciari 1 files with test failures on linux-x64:

  • test/js/third_party/fsevents/fsevents.test.ts

View test output

#c07180a368f1ac203a73157cf3dfbdef341a631a

@github-actions
Copy link
Contributor

github-actions bot commented Aug 18, 2023

@cirospaciari 8 files with test failures on bun-darwin-aarch64:

  • test/js/bun/spawn/spawn.test.ts
  • test/js/bun/test/test-test.test.ts
  • test/js/node/dns/node-dns.test.js
  • test/js/node/fs/fs.test.ts
  • test/js/node/watch/fs.watch.test.ts
  • test/js/third_party/fsevents/fsevents.test.ts
  • test/js/web/fetch/fetch.stream.test.ts
  • test/js/web/worker.test.ts

View test output

#c07180a368f1ac203a73157cf3dfbdef341a631a

@github-actions
Copy link
Contributor

github-actions bot commented Aug 18, 2023

@cirospaciari 5 files with test failures on bun-darwin-x64-baseline:

  • test/js/bun/spawn/spawn-streaming-stdin.test.ts
  • test/js/bun/sqlite/sqlite.test.js
  • test/js/node/fs/fs.test.ts
  • test/js/third_party/webpack/webpack.test.ts
  • test/js/web/timers/setTimeout.test.js

View test output

#c07180a368f1ac203a73157cf3dfbdef341a631a

@Jarred-Sumner Jarred-Sumner marked this pull request as draft August 18, 2023 02:54
@CodeFromAnywhere
Copy link

GREAT WORK! Looking forward to this one.

@cirospaciari cirospaciari force-pushed the ciro/fetch-streaming branch 2 times, most recently from 0757809 to 86f0dd1 Compare August 19, 2023 01:26
@cirospaciari cirospaciari changed the title WIP: fetch(stream) add stream support for compressed and uncompressed data fetch(stream) add stream support for compressed and uncompressed data Aug 19, 2023
@cirospaciari cirospaciari marked this pull request as ready for review August 19, 2023 02:37
@@ -73,6 +73,8 @@ pub fn UnboundedQueue(comptime T: type, comptime next_field: meta.FieldEnum(T))
}

pub fn pop(self: *Self) ?*T {
if (self.isEmpty()) return null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if count is incremented after isEmpty() returns true?

Why isn't it sufficient for front to be null and then return?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I can revert this, this only evades the problem but is still race, for some reason we are getting an assertion error on assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);

image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should use popBatch

pub fn size(this: *const Value) Blob.SizeType {
return switch (this.*) {
.Blob => this.Blob.size,
.InternalBlob => @as(Blob.SizeType, @truncate(this.InternalBlob.sliceConst().len)),
.WTFStringImpl => @as(Blob.SizeType, @truncate(this.WTFStringImpl.utf8ByteLength())),
.Locked => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.Locked => {
.Locked => this._lcokedSize()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in 4676cfe

@@ -619,23 +621,32 @@ pub const Fetch = struct {
javascript_vm: *VirtualMachine = undefined,
global_this: *JSGlobalObject = undefined,
request_body: HTTPRequestBody = undefined,
// buffer being used by AsyncHTTP
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use /// because // doesn't show up in autocomplete

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed 4676cfe

return;
}
} else {
this.response.?.body.value.Locked.size_hint = @as(u52, @intCast(this.body_size));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the pointer aliasing here intentional?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed 4676cfe

return;
}

if (this.response) |response| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is keeping this response alive? How can we be sure that the garbage collector doesn't free the Response object in the middle of streaming?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed 4676cfe

@Jarred-Sumner
Copy link
Collaborator

I think this needs about two dozen more tests

@Jarred-Sumner
Copy link
Collaborator

looks like macOS build fails

@@ -1181,7 +1181,7 @@ WebCore::FetchHeaders* WebCore__FetchHeaders__createFromPicoHeaders_(const void*

for (size_t j = 0; j < end; j++) {
PicoHTTPHeader header = pico_headers.ptr[j];
if (header.value.len == 0)
if (header.value.len == 0 || header.name.len == 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting

@@ -3794,6 +3794,10 @@ pub const Blob = struct {
} else {
return build.blob.dupe();
}
} else if (current.toSliceOrNull(global)) |sliced| {
if (sliced.allocator.get()) |allocator| {
return Blob.initWithAllASCII(bun.constStrToU8(sliced.slice()), allocator, global, false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be cloned. There are too many cases where we assume Blob is always owned by mimalloc, but in this case it might not be.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also if you do .arrayBuffer() on a Response it will transfer the buffer which would mean that the string is now mutable which will break an unknown number of things.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toSlice does not if it's an ASCII latin1 string. toOwnedSlice clones it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed on ef6989f

// .InlineBlob => @truncate(Blob.SizeType, this.InlineBlob.sliceConst().len),
else => 0,
};
}

pub fn estimatedSize(this: *const Value) usize {
return switch (this.*) {
.Blob => this.Blob.size,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct. Blob.size defaults to a max value which is an absurdly large number to indicate that it is not a valid number. This would need to call resolveSize which would be too expensive to call here because it potentially does stat() on a file in the filesystem.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed on ef6989f

}

fn getSizeHint(this: *FetchTasklet) u52 {
if (this.body_size == .content_length) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be more idiomatic Zig for this to be a switch statement

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed on ef6989f

};
}

fn getSizeHint(this: *FetchTasklet) u52 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use Blob.SizeType instead of directly using u52? I think that makes it more clear that it is consistent

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed on ef6989f


fn getSizeHint(this: *FetchTasklet) u52 {
if (this.body_size == .content_length) {
return @as(u52, @intCast(this.body_size.content_length));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be truncate and the @as is unnecessary

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed on ef6989f

@@ -830,7 +1022,10 @@ pub const Fetch = struct {
const allocator = bun.default_allocator;
var response = allocator.create(Response) catch unreachable;
response.* = this.toResponse(allocator);
return Response.makeMaybePooled(@as(js.JSContextRef, @ptrCast(this.global_this)), response);
const response_js = Response.makeMaybePooled(@as(js.JSContextRef, @ptrCast(this.global_this)), response);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the @ptrCast here does nothing btw

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed on ef6989f

.err = Syscall.Error{
.errno = @as(Syscall.Error.Int, @intCast(-completion.result)),
.syscall = .read,
.err = .{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err.err is kinda weird

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah will rename to sys_err

}
}
gcTick(false);
expect(buffer.byteLength).toBe(content.byteLength);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only checks the length. It needs to check the value too. expect(buffer).toEqual(content)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed on 70d8e69

@Jarred-Sumner
Copy link
Collaborator

Thank you for writing the tests. I think we can ship this

Copy link
Collaborator

@Jarred-Sumner Jarred-Sumner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any idea why the fsevents test is failing?

@@ -545,8 +545,7 @@ pub const EventLoop = struct {
},
.FetchTasklet => {
var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
fetch_task.onDone();
fetch_task.deinit();
fetch_task.onProgressUpdate();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when does the task itself get freed?

Copy link
Collaborator Author

@cirospaciari cirospaciari Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it self deinit after receiving the has_more == false after clearData calls

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the Tasklet part. The thing which is queued to the tasks queue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the Tasklet part. The thing which is queued to the tasks queue.

Oh I see dummy mistake, let me fix it ASAP

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually only one task is allocated, and on the last callback it is freed. We use task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit)); so we use the same pointer to the task, not need to be freed, on each callback only in the last one the deinits are:

this.deinit();

this.deinit();

this.deinit();

@cirospaciari
Copy link
Collaborator Author

any idea why the fsevents test is failing?

no idea, this poped up after the last rebase

@@ -525,9 +525,19 @@ pub const StreamResult = union(Tag) {
into_array: IntoArray,
into_array_and_done: IntoArray,
pending: *Pending,
err: Syscall.Error,

err: union(Err) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe do

Error: Syscall.Error
JSValue: JSC.JSValue

That JSvalue version also needs to be kept alive somehow.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed on 4d07c98

@Jarred-Sumner Jarred-Sumner merged commit 9027484 into main Aug 22, 2023
15 of 20 checks passed
@Jarred-Sumner Jarred-Sumner deleted the ciro/fetch-streaming branch August 22, 2023 03:30
@Jarred-Sumner
Copy link
Collaborator

🥳 🥳 🥳

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants