Skip to content

Commit

Permalink
Progress callbacks fire for all meta-request types
Browse files Browse the repository at this point in the history
  • Loading branch information
graebm committed Aug 23, 2023
1 parent a691a2f commit ba1d840
Show file tree
Hide file tree
Showing 14 changed files with 418 additions and 200 deletions.
56 changes: 52 additions & 4 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,25 @@ struct aws_s3_prepare_request_payload {
void *user_data;
};

/* An event to be delivered on the meta-request's io_event_loop thread. */
struct aws_s3_meta_request_event {
enum aws_s3_meta_request_event_type {
AWS_S3_META_REQUEST_EVENT_RESPONSE_BODY, /* body_callback */
AWS_S3_META_REQUEST_EVENT_PROGRESS, /* progress_callback */
/* TODO: AWS_S3_META_REQUEST_EVENT_TELEMETRY */
} type;

union {
struct {
struct aws_s3_request *request;
} response_body;

struct {
struct aws_s3_meta_request_progress info;
} progress;
} u;
};

struct aws_s3_meta_request_vtable {
/* Update the meta request. out_request is required to be non-null. Returns true if there is any work in
* progress, false if there is not. */
Expand Down Expand Up @@ -179,11 +198,19 @@ struct aws_s3_meta_request {
* failed.)*/
uint32_t num_parts_delivery_completed;

/* Number of parts that have been successfully delivered to the caller. */
uint32_t num_parts_delivery_succeeded;
/* Task for delivering events on the meta-request's io_event_loop thread.
* We do this to ensure a meta-request's callbacks are fired sequentially and non-overlapping.
* If `event_delivery_array` has items in it, then this task is scheduled.
* If `event_delivery_active` is true, then this task is actively running.
* Delivery is not 100% complete until `event_delivery_array` is empty AND `event_delivery_active` is false
* (use aws_s3_meta_request_are_events_out_for_delivery_synced() to check) */
struct aws_task event_delivery_task;

/* Array of `struct aws_s3_meta_request_event` to deliver when the `event_delivery_task` runs. */
struct aws_array_list event_delivery_array;

/* Number of parts that have failed while trying to be delivered to the caller. */
uint32_t num_parts_delivery_failed;
/* When true, events are actively being delivered to the user. */
bool event_delivery_active;

/* The end finish result of the meta request. */
struct aws_s3_meta_request_result finish_result;
Expand All @@ -205,6 +232,14 @@ struct aws_s3_meta_request {

} client_process_work_threaded_data;

/* Anything in this structure should only ever be accessed by the meta-request from its io_event_loop thread. */
struct {
/* When delivering events, we swap contents with `synced_data.event_delivery_array`.
* This is an optimization, we could have just copied the array when the task runs,
* but swapping two array-lists back and forth avoids an allocation. */
struct aws_array_list event_delivery_array;
} io_threaded_data;

const bool should_compute_content_md5;

/* deep copy of the checksum config. */
Expand Down Expand Up @@ -316,6 +351,19 @@ void aws_s3_meta_request_stream_response_body_synced(
struct aws_s3_meta_request *meta_request,
struct aws_s3_request *request);

/* Add an event for delivery on the meta-request's io_event_loop thread.
* These events usually correspond to callbacks that must fire sequentially and non-overlapping,
* such as delivery of a part's response body. */
AWS_S3_API
void aws_s3_meta_request_add_event_for_delivery_synced(
struct aws_s3_meta_request *meta_request,
const struct aws_s3_meta_request_event *event);

/* Returns whether any events are out for delivery.
* The meta-request's finish callback must not be invoked until this returns false. */
AWS_S3_API
bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_request *meta_request);

/* Asynchronously read from the meta request's input stream. Should always be done outside of any mutex,
* as reading from the stream could cause user code to call back into aws-c-s3.
* This will fill the buffer to capacity, unless end of stream is reached.
Expand Down
4 changes: 2 additions & 2 deletions include/aws/s3/private/s3_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,10 @@ AWS_S3_API
void aws_s3_request_clean_up_send_data(struct aws_s3_request *request);

AWS_S3_API
void aws_s3_request_acquire(struct aws_s3_request *request);
struct aws_s3_request *aws_s3_request_acquire(struct aws_s3_request *request);

AWS_S3_API
void aws_s3_request_release(struct aws_s3_request *request);
struct aws_s3_request *aws_s3_request_release(struct aws_s3_request *request);

AWS_S3_API
struct aws_s3_request_metrics *aws_s3_request_metrics_new(
Expand Down
7 changes: 6 additions & 1 deletion include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ struct aws_s3_meta_request_progress {
};

/**
* Invoked to report progress of multi-part upload and copy object requests.
* Invoked to report progress of a meta-request.
* For PutObject, progress refers to bytes uploaded.
* For CopyObject, progress refers to bytes copied.
* For GetObject, progress refers to bytes downloaded.
* For anything else, progress refers to response bytes received.
*/
typedef void(aws_s3_meta_request_progress_fn)(
struct aws_s3_meta_request *meta_request,
Expand Down Expand Up @@ -534,6 +538,7 @@ struct aws_s3_meta_request_options {

/**
* Invoked to report progress of the meta request execution.
* See `aws_s3_meta_request_progress_fn`.
*/
aws_s3_meta_request_progress_fn *progress_callback;

Expand Down
16 changes: 16 additions & 0 deletions source/s3_auto_ranged_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ static bool s_s3_auto_ranged_get_update(
}

no_work_remaining:
/* If some events are still being delivered to caller, then wait for those to finish */
if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) {
work_remaining = true;
}

if (!work_remaining) {
aws_s3_meta_request_set_success_synced(meta_request, s_s3_auto_ranged_get_success_status(meta_request));
Expand Down Expand Up @@ -695,6 +699,18 @@ static void s_s3_auto_ranged_get_request_finished(
}
++auto_ranged_get->synced_data.num_parts_successful;

/* Send progress_callback for delivery on io_event_loop thread */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = request->send_data.response_body.len;
event.u.progress.info.content_length =
auto_ranged_get->synced_data.object_range_empty
? 0
: (auto_ranged_get->synced_data.object_range_end + 1 -
auto_ranged_get->synced_data.object_range_start);
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

aws_s3_meta_request_stream_response_body_synced(meta_request, request);

AWS_LOGF_DEBUG(
Expand Down
31 changes: 24 additions & 7 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,10 @@ static bool s_s3_auto_ranged_put_update(
work_remaining = true;

no_work_remaining:
/* If some events are still being delivered to caller, then wait for those to finish */
if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) {
work_remaining = true;
}

if (!work_remaining) {
aws_s3_meta_request_set_success_synced(meta_request, AWS_S3_RESPONSE_STATUS_SUCCESS);
Expand Down Expand Up @@ -1598,6 +1602,8 @@ static void s_s3_auto_ranged_put_request_finished(
AWS_LS_S3_META_REQUEST, "id=%p Failed to parse list parts response.", (void *)meta_request);
error_code = AWS_ERROR_S3_LIST_PARTS_PARSE_FAILED;
} else if (!has_more_results) {
uint64_t bytes_previously_uploaded = 0;

for (size_t part_index = 0;
part_index < aws_array_list_length(&auto_ranged_put->synced_data.part_list);
part_index++) {
Expand All @@ -1607,6 +1613,8 @@ static void s_s3_auto_ranged_put_request_finished(
/* Update the number of parts sent/completed previously */
++auto_ranged_put->synced_data.num_parts_started;
++auto_ranged_put->synced_data.num_parts_completed;

bytes_previously_uploaded += part->size;
}
}

Expand All @@ -1616,6 +1624,14 @@ static void s_s3_auto_ranged_put_request_finished(
(void *)meta_request,
auto_ranged_put->synced_data.num_parts_completed,
auto_ranged_put->total_num_parts_from_content_length);

/* Deliver an initial progress_callback to report all previously uploaded parts. */
if (meta_request->progress_callback != NULL && bytes_previously_uploaded > 0) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = bytes_previously_uploaded;
event.u.progress.info.content_length = auto_ranged_put->content_length;
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}
}
}

Expand Down Expand Up @@ -1742,13 +1758,6 @@ static void s_s3_auto_ranged_put_request_finished(
etag = aws_strip_quotes(meta_request->allocator, etag_within_quotes);
}
}
if (error_code == AWS_ERROR_SUCCESS && meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_progress progress = {
.bytes_transferred = request->request_body.len,
.content_length = auto_ranged_put->content_length,
};
meta_request->progress_callback(meta_request, &progress, meta_request->user_data);
}
}

/* BEGIN CRITICAL SECTION */
Expand Down Expand Up @@ -1782,6 +1791,14 @@ static void s_s3_auto_ranged_put_request_finished(

++auto_ranged_put->synced_data.num_parts_successful;

/* Send progress_callback for delivery on io_event_loop thread */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = request->request_body.len;
event.u.progress.info.content_length = auto_ranged_put->content_length;
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

/* Store part's ETag */
struct aws_s3_mpu_part_info *part = NULL;
aws_array_list_get_at(&auto_ranged_put->synced_data.part_list, &part, part_index);
Expand Down
10 changes: 4 additions & 6 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1481,8 +1481,7 @@ static void s_s3_client_prepare_callback_queue_request(
request_is_noop = request->is_noop != 0;
s_s3_client_meta_request_finished_request(client, meta_request, request, error_code);

aws_s3_request_release(request);
request = NULL;
request = aws_s3_request_release(request);
}

/* BEGIN CRITICAL SECTION */
Expand Down Expand Up @@ -1522,8 +1521,7 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {
if (!request->always_send && aws_s3_meta_request_has_finish_result(request->meta_request)) {
s_s3_client_meta_request_finished_request(client, request->meta_request, request, AWS_ERROR_S3_CANCELED);

aws_s3_request_release(request);
request = NULL;
request = aws_s3_request_release(request);
} else if (
s_s3_client_get_num_requests_network_io(client, request->meta_request->type) < max_active_connections) {
s_s3_client_create_connection_for_request(client, request);
Expand Down Expand Up @@ -1856,8 +1854,8 @@ void aws_s3_client_notify_connection_finished(
}

if (connection->request != NULL) {
aws_s3_request_release(connection->request);
connection->request = NULL;

connection->request = aws_s3_request_release(connection->request);
}

aws_retry_token_release(connection->retry_token);
Expand Down
23 changes: 19 additions & 4 deletions source/s3_copy_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,10 @@ static bool s_s3_copy_object_update(
work_remaining = true;

no_work_remaining:
/* If some events are still being delivered to caller, then wait for those to finish */
if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) {
work_remaining = true;
}

if (!work_remaining) {
aws_s3_meta_request_set_success_synced(meta_request, AWS_S3_RESPONSE_STATUS_SUCCESS);
Expand Down Expand Up @@ -640,6 +644,15 @@ static void s_s3_copy_object_request_finished(

/* Signals completion of the meta request */
if (error_code == AWS_ERROR_SUCCESS) {

/* Send progress_callback for delivery on io_event_loop thread */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = copy_object->synced_data.content_length;
event.u.progress.info.content_length = copy_object->synced_data.content_length;
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

copy_object->synced_data.copy_request_bypass_completed = true;
} else {
/* Bypassed CopyObject request failed */
Expand Down Expand Up @@ -720,11 +733,13 @@ static void s_s3_copy_object_request_finished(
AWS_ASSERT(etag != NULL);

++copy_object->synced_data.num_parts_successful;

/* Send progress_callback for delivery on io_event_loop thread. */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_progress progress = {
.bytes_transferred = copy_object->synced_data.part_size,
.content_length = copy_object->synced_data.content_length};
meta_request->progress_callback(meta_request, &progress, meta_request->user_data);
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = copy_object->synced_data.part_size;
event.u.progress.info.content_length = copy_object->synced_data.content_length;
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

struct aws_s3_mpu_part_info *part = NULL;
Expand Down
23 changes: 23 additions & 0 deletions source/s3_default_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ static bool s_s3_meta_request_default_update(
work_remaining = true;

no_work_remaining:
/* If some events are still being delivered to caller, then wait for those to finish */
if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) {
work_remaining = true;
}

if (!work_remaining) {
aws_s3_meta_request_set_success_synced(
Expand Down Expand Up @@ -366,6 +370,25 @@ static void s_s3_meta_request_default_request_finished(
meta_request_default->synced_data.request_error_code = error_code;

if (error_code == AWS_ERROR_SUCCESS) {
/* Send progress_callback for delivery on io_event_loop thread.
* For default meta-requests, we invoke the progress_callback once, after the sole HTTP request completes.
* This is simpler than reporting incremental progress as the response body is received,
* or the request body is streamed out, since then we'd also need to handle retries that reset
* progress back to 0% (our existing API only lets us report forward progress). */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
if (meta_request->type == AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) {
/* For uploads, report request body size */
event.u.progress.info.bytes_transferred = request->request_body.len;
event.u.progress.info.content_length = request->request_body.len;
} else {
/* For anything else, report response body size */
event.u.progress.info.bytes_transferred = request->send_data.response_body.len;
event.u.progress.info.content_length = request->send_data.response_body.len;
}
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

aws_s3_meta_request_stream_response_body_synced(meta_request, request);
} else {
aws_s3_meta_request_set_fail_synced(meta_request, request, error_code);
Expand Down
Loading

0 comments on commit ba1d840

Please sign in to comment.