Skip to content

Commit

Permalink
feat(edge): add support for tap filtering in edge
Browse files Browse the repository at this point in the history
This adds support for the `filter` parameter supplied via the task
definition.

Ref: LOG-19757
  • Loading branch information
mdeltito committed Apr 26, 2024
1 parent 7ad8f42 commit 1797324
Showing 1 changed file with 43 additions and 1 deletion.
44 changes: 43 additions & 1 deletion src/mezmo/remote_task_execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ struct TaskParameters {
limit: Option<isize>,
timeout_ms: Option<u64>,
component_id: Option<String>,
filter: Option<String>,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -249,6 +250,8 @@ async fn tap(task: &Task, config: &config::api::Options) -> Result<TaskResult, E
.limit
.unwrap_or(DEFAULT_TAP_LIMIT_PER_INTERVAL);

let filter = task.task_parameters.filter.clone();

let subscription_client = connect_subscription_client(url)
.await
.map_err(|e| format!("Couldn't connect to Vector API via WebSockets: {}", e))?;
Expand All @@ -257,7 +260,7 @@ async fn tap(task: &Task, config: &config::api::Options) -> Result<TaskResult, E
let stream = subscription_client.output_events_by_component_id_patterns_subscription(
vec![component_id.to_string()],
vec![],
None,
filter,
TapEncodingFormat::Json,
limit as i64,
SUBSCRIPTION_FLUSH_INTERVAL_MS,
Expand Down Expand Up @@ -377,4 +380,43 @@ mod tests {

run_task_step(&Default::default(), &client, "token", &get_url, &post_url).await;
}

#[tokio::test]
async fn fetches_tasks_including_filters() {
let get_path = "/fake/get/url";
let post_path = "/fake/post/:task_id/url";
let server = Server::run();
server.expect(
Expectation::matching(all_of![request::method("GET"), request::path(get_path),])
.times(1)
.respond_with(json_encoded(json!({
"data": [{
"task_id": "task1",
"task_type": "tap",
"age_secs": 1,
"task_parameters": {
"component_id": "comp1",
"limit": 1,
"timeout_ms": 1000,
"filter": ".message == \"hello\""
},
}]
}))),
);

server.expect(
Expectation::matching(all_of![
request::method("POST"),
request::path("/fake/post/task1/url"),
])
.times(1)
.respond_with(status_code(200)),
);

let get_url = format!("http://{}{}", server.addr(), get_path);
let post_url = format!("http://{}{}", server.addr(), post_path);
let client = Client::new();

run_task_step(&Default::default(), &client, "token", &get_url, &post_url).await;
}
}

0 comments on commit 1797324

Please sign in to comment.