Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Offline Nodes] Adds new library for offline tasks #13574

Merged
merged 35 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
bf65b92
[Offline Nodes] Adds new library offline tasks
linuxpi May 7, 2024
1742088
add documentation & TaskWorker interface
linuxpi May 7, 2024
2efd394
add docstring for offline-tasks library
linuxpi May 7, 2024
d0b4983
add TaskWorkerPlugin interface
linuxpi May 7, 2024
152e28e
fix missingjavadoc issue
linuxpi May 8, 2024
dbf97ca
move plugin interfaces to :server plugins namespace
linuxpi May 9, 2024
d6eadca
add changelog
linuxpi May 9, 2024
a2beb11
update changelog
linuxpi May 9, 2024
a304615
Merge branch 'main' into offline-tasks-library
linuxpi May 9, 2024
9b19227
add missing javaDocStrings
linuxpi May 9, 2024
804fcb1
Merge branch 'main' into offline-tasks-library
linuxpi May 22, 2024
9e48827
Add TaskStatus attribute of a Task and update CHANGELOG
linuxpi May 22, 2024
806edc8
Refactor list tasks : Add dedicated TaskListQueryParams
linuxpi May 22, 2024
25fa935
change claimTask signature to return boolean
linuxpi May 22, 2024
a330bfd
add new TaskStatus - UNASSIGNED, ASSIGNED, CANCELLED
linuxpi May 31, 2024
7c08662
add getTasks contract to accept TaskType as well
linuxpi May 31, 2024
34094a5
add builder for Task and add timestamp related attributes
linuxpi May 31, 2024
a0cbd97
add WorkerNode to Task to add assigned node information and provision…
linuxpi May 31, 2024
aceb74d
refactor TaskClient into TaskProducerClient and TaskManagerClient
linuxpi May 31, 2024
f50b167
refactor list tasks api
linuxpi May 31, 2024
8bfcb40
rename all occurences of claim to assign
linuxpi Jun 13, 2024
42f9e73
add support to list task assignments from a separate store other than…
linuxpi Jun 13, 2024
bd2db78
Rename to task commmons
Bukhtawar Jun 13, 2024
8e3385c
fix spotless findings
linuxpi Jun 13, 2024
1bd0b3b
separate out Task Clients based on different actors
linuxpi Jun 13, 2024
cc92cca
add missing docstring
linuxpi Jun 13, 2024
09d6965
Merge branch 'main' into offline-tasks-library
linuxpi Jul 31, 2024
37d81b1
add Tests for Task
linuxpi Aug 1, 2024
ec76335
Merge branch 'main' into offline-tasks-library
linuxpi Aug 1, 2024
6470881
Merge branch 'main' into offline-tasks-library
linuxpi Aug 4, 2024
3a84e95
support to update task status via Builder
linuxpi Aug 5, 2024
a1e3bc3
add tests for WorkerNode and TaskId
linuxpi Aug 6, 2024
21abf2a
reword to remove offline keyword
linuxpi Aug 7, 2024
cf26f01
gaurd plugin registration behind feature flag
linuxpi Aug 9, 2024
9a87245
Merge branch 'main' into offline-tasks-library
linuxpi Aug 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add latency metrics for instrumenting critical clusterManager code paths ([#12333](https://github.com/opensearch-project/OpenSearch/pull/12333))
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
- [Offline Nodes] Adds offline-tasks library containing various interfaces to be used for Offline Background Tasks. ([#13574](https://github.com/opensearch-project/OpenSearch/pull/13574))
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))

### Dependencies
Expand Down
25 changes: 25 additions & 0 deletions libs/offline-tasks/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

dependencies {
api project(':libs:opensearch-common')

testImplementation "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
testImplementation "junit:junit:${versions.junit}"
testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}"
testImplementation(project(":test:framework")) {
exclude group: 'org.opensearch', module: 'opensearch-offline-tasks'
}
}

tasks.named('forbiddenApisMain').configure {
replaceSignatureFiles 'jdk-signatures'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.offline_tasks.client;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.offline_tasks.task.Task;
import org.opensearch.offline_tasks.task.TaskId;
import org.opensearch.offline_tasks.task.TaskStatus;

import java.util.List;

/**
* Client used to interact with Task Store/Queue
*
* @opensearch.experimental
*/
@ExperimentalApi
public interface TaskClient {

/**
* Submit a new task to TaskStore/Queue
*
* @param task Task to be submitted for execution on offline nodes
*/
void submitTask(Task task);

/**
* Claim task from TaskStore/Queue. This ensures no 2 Offline Nodes work on the same task.
*
* @param taskId TaskId of the task to be claimed
* @return true if task is claimed successfully, false otherwise
*/
boolean claimTask(TaskId taskId);
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved

/**
* Get task from TaskStore/Queue
*
* @param taskId TaskId of the task to be retrieved
* @return Task corresponding to TaskId
*/
Task getTask(TaskId taskId);

/**
* Update task in TaskStore/Queue
*
* @param task Task to be updated
*/
void updateTask(Task task);

/**
* Mark task as cancelled.
* Ongoing Tasks can be cancelled as well if the corresponding worker supports cancellation
*
* @param taskId TaskId of the task to be cancelled
*/
void cancelTask(TaskId taskId);

/**
* List all tasks with a particular {@param taskStatus} considering {@param listTaskStatus}
* @param taskStatus status of the tasks to be listed
* @param taskListQueryParams params to filter the tasks to be listed
* @return list of all the task matching the taskStatus
*/
List<Task> getTasks(TaskStatus taskStatus, TaskListQueryParams taskListQueryParams);
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved

/**
* List all tasks considering {@param listTaskStatus}
* @param taskListQueryParams params to filter the tasks to be listed
* @return list of all the task matching the taskStatus
*/
List<Task> getTasks(TaskListQueryParams taskListQueryParams);
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved

/**
* Sends task heart beat to Task Store/Queue
*
* @param taskId TaskId of Task to send heartbeat for
* @param timestamp timestamp of heartbeat to be recorded in TaskStore/Queue
*/
void sendTaskHeartbeat(TaskId taskId, long timestamp);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.offline_tasks.client;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.offline_tasks.task.TaskStatus;

/**
* Holder interface for various params different implementations might add for list calls
* A typical example would be pagination related params
*
* Since Each implementation of this abstraction can host multiple different params,
* it is recommended to use Builder Pattern for construction of the objects
*/
@ExperimentalApi
public abstract class TaskListQueryParams {

/**
* Depicts the start page number for the list call.
*
* @see TaskClient#getTasks(TaskStatus, TaskListQueryParams)
*/
private final int startPageNumber;

/**
* Depicts the page size for the list call.
*
* @see TaskClient#getTasks(TaskStatus, TaskListQueryParams)
*/
private final int pageSize;

/**
* Constructor for ListTaskParams
* @param startPageNumber start page number for the list call
* @param pageSize page size for the list call
*/
private TaskListQueryParams(int startPageNumber, int pageSize) {
this.startPageNumber = startPageNumber;
this.pageSize = pageSize;
}

/**
* Get pageSize
* @return pageSize for the list call
*/
public int getPageSize() {
return pageSize;
}

/**
* Get startPageNumber
* @return startPageNumber for the list call
*/
public int getStartPageNumber() {
return startPageNumber;
}

/**
* Builder class for ListTaskParams. Since this class can grow and host multiple params
*/
public abstract static class Builder {
/**
* Depicts the start page number for the list call.
*
* @see TaskClient#getTasks(TaskStatus, TaskListQueryParams)
*/
private int startPageNumber = 1;

/**
* Depicts the page size for the list call.
*
* @see TaskClient#getTasks(TaskStatus, TaskListQueryParams)
*/
private int pageSize = 50;

public abstract TaskListQueryParams build();

public Builder setStartPageNumber(int startPageNumber) {
this.startPageNumber = startPageNumber;
return this;
}

public Builder setPageSize(int pageSize){
this.pageSize = pageSize;
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Contains task client related classes
*/
package org.opensearch.offline_tasks.client;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Contains offline tasks related classes
*/
package org.opensearch.offline_tasks;
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.offline_tasks.task;

import org.opensearch.common.annotation.ExperimentalApi;

/**
* A Background Task to be run on Offline Node.
*/
@ExperimentalApi
public class Task {
linuxpi marked this conversation as resolved.
Show resolved Hide resolved

/**
* Task identifier used to uniquely identify a Task
*/
private final TaskId taskId;

/**
* Depicts latest state of the Task
*/
private final TaskStatus taskStatus;

/**
* Various params to used for Task execution
*/
private final TaskParams params;

/**
* Type/Category of the Task
*/
private final TaskType taskType;

/**
* Constructor for Task
*
* @param taskId Task identifier
* @param taskStatus Task status
* @param params Task Params
* @param taskType Task Type
*/
public Task(TaskId taskId, TaskStatus taskStatus, TaskParams params, TaskType taskType) {
this.taskId = taskId;
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
this.taskStatus = taskStatus;
this.params = params;
this.taskType = taskType;
}

/**
* Get TaskId
* @return TaskId
*/
public TaskId getTaskId() {
return taskId;
}

/**
* Get TaskStatus
* @return TaskStatus
*/
public TaskStatus getTaskStatus() {
return taskStatus;
}

/**
* Get TaskParams
* @return TaskParams
*/
public TaskParams getParams() {
return params;
}

/**
* Get TaskType
* @return TaskType
*/
public TaskType getTaskType() {
return taskType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.offline_tasks.task;

import org.opensearch.common.annotation.ExperimentalApi;

/**
* Class encapsulating Task identifier
*/
@ExperimentalApi
public class TaskId {

/**
* Identified of the Task
*/
private final String id;
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved

/**
* Constructor to initialize TaskId
* @param id String value of Task id
*/
public TaskId(String id) {
this.id = id;
}

/**
* Get id value
* @return id
*/
public String getValue() {
return id;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.offline_tasks.task;

import org.opensearch.common.annotation.ExperimentalApi;

/**
* Base class for all TaskParams implementation of various TaskTypes
*/
@ExperimentalApi
public abstract class TaskParams {

/**
* Default constructor
*/
public TaskParams() {}
}
Loading
Loading