Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix watcher not fully paginating on Init #1525

Merged
merged 2 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,12 @@ where
last_bookmark,
});
}
if let Some(resource_version) = last_bookmark {
// we have drained the last page - move on to next stage
return (Some(Ok(Event::InitDone)), State::InitListed { resource_version });
// check if we need to perform more pages
if continue_token.is_none() {
if let Some(resource_version) = last_bookmark {
// we have drained the last page - move on to next stage
return (Some(Ok(Event::InitDone)), State::InitListed { resource_version });
}
}
let mut lp = wc.to_list_params();
lp.continue_token = continue_token;
Expand Down
26 changes: 24 additions & 2 deletions kube/src/mock_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ impl Hack {
#[tokio::test]
async fn watchers_respect_pagination_limits() {
let (client, fakeserver) = testcontext();
// NB: scenario only responds responds to TWO paginated list calls with two objects
// NB: page scenario which responds to 3 paginated list calls with 3 object (one per page).
// This ensures the watcher internal paging mechanism is not bypassed
// and that each page is actually drained before starting the long watch.
let mocksrv = fakeserver.run(Scenario::PaginatedList);

let api: Api<Hack> = Api::all(client);
Expand All @@ -39,6 +41,8 @@ async fn watchers_respect_pagination_limits() {
assert_eq!(first.spec.num, 1);
let second: Hack = stream.try_next().await.unwrap().unwrap();
assert_eq!(second.spec.num, 2);
let third: Hack = stream.try_next().await.unwrap().unwrap();
assert_eq!(third.spec.num, 3);
assert!(poll!(stream.next()).is_pending());
timeout_after_1s(mocksrv).await;
}
Expand Down Expand Up @@ -117,14 +121,32 @@ impl ApiServerVerifier {
"kind": "HackList",
"apiVersion": "kube.rs/v1",
"metadata": {
"continue": "",
"continue": "second",
"resourceVersion": "2"
},
"items": [Hack::test(2)]
});
let response = serde_json::to_vec(&respdata).unwrap(); // respond as the apiserver would have
send.send_response(Response::builder().body(Body::from(response)).unwrap());
}
{
// we expect a final list GET because we included a continue token
let (request, send) = self.0.next_request().await.expect("service not called 3");
assert_eq!(request.method(), http::Method::GET);
let req_uri = request.uri().to_string();
assert!(req_uri.contains("&continue=second"));
let respdata = json!({
"kind": "HackList",
"apiVersion": "kube.rs/v1",
"metadata": {
"continue": "",
"resourceVersion": "2"
},
"items": [Hack::test(3)]
});
let response = serde_json::to_vec(&respdata).unwrap(); // respond as the apiserver would have
send.send_response(Response::builder().body(Body::from(response)).unwrap());
}
Ok(self)
}
}
Expand Down
Loading