Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CI] Add unittest for global_scheduler and entrypoints #12

Merged
merged 38 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
6c3d2f3
Refine manager arguments
s5u13b Aug 12, 2024
9c01a48
Further rename migration and scaling
s5u13b Aug 12, 2024
94444c0
Rename migration policy to request migration policy
s5u13b Aug 12, 2024
36a5fad
Fix launch_ray_cluster
s5u13b Aug 12, 2024
2b7aece
Rename logging to log
s5u13b Aug 12, 2024
b90201e
Rename get_log_instance_info to get_instance_info
s5u13b Aug 12, 2024
66fd3d6
Fix indent
s5u13b Aug 13, 2024
da79000
Fix llumlet failover during initialization
s5u13b Aug 13, 2024
23929e5
[WIP] Initialize llumlet in manager
s5u13b Aug 14, 2024
de971cb
Clean debugging codes
s5u13b Aug 14, 2024
e8d671f
Pass node_id of api server
s5u13b Aug 14, 2024
ff91d83
Fix typos found during manager unittest
s5u13b Aug 15, 2024
11b23b3
Add manager unittest
s5u13b Aug 15, 2024
5ab9c8f
Fix indent in migration scheduler
s5u13b Aug 15, 2024
13614de
Fix typos found during global scheduler unittest
s5u13b Aug 15, 2024
34d3a25
Add global scheduler unittest
s5u13b Aug 15, 2024
96b78b2
Fix typos found during llumnix utils unittest
s5u13b Aug 15, 2024
2e530c9
Add llumnix utils unittest
s5u13b Aug 15, 2024
6eaffa5
Fix typos
s5u13b Aug 16, 2024
12fbcc0
Merge branch 'manager_args' into unittest
s5u13b Aug 16, 2024
2a29cd4
Merge branch 'manager_args' into init-llumlet
s5u13b Aug 16, 2024
4976f39
Add assert
s5u13b Aug 16, 2024
ed7f5d0
Merge branch 'main' into init-llumlet
s5u13b Aug 16, 2024
81ffd75
Add api server unittest
s5u13b Aug 16, 2024
37c758b
Merge branch 'main' into unittest
s5u13b Aug 16, 2024
7c16933
Fix api server unittest
s5u13b Aug 16, 2024
6780570
Rename init-instance-in-manager to init-instance-by-manager
s5u13b Aug 21, 2024
242fef6
Add TODOs
s5u13b Aug 21, 2024
38d1f17
Set correct node_id for worker and recv actor
s5u13b Aug 21, 2024
64c968e
Fix pylint
s5u13b Aug 21, 2024
89ac17e
Adjust hack node_id way
s5u13b Aug 21, 2024
3120635
Merge branch 'init-llumlet' into unittest
s5u13b Aug 22, 2024
d92412b
Fix typos mentioned in CR
s5u13b Aug 22, 2024
d75fcb3
Divide dispatch unittest
s5u13b Aug 22, 2024
83a18fc
Add TODO
s5u13b Aug 22, 2024
0f17480
Handle no instance case when generate
s5u13b Aug 22, 2024
71b2e72
Add missing copyright
s5u13b Aug 22, 2024
6637f75
Rename prefill to defrag in pair migration policy
s5u13b Aug 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions benchmark/benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
from typing import List


num_finished_request = 0
server_num_request = {}
num_finished_requests = 0
server_num_requests = {}


def get_wait_time(mean_time_between_requests: float, distribution: str, coefficient_variation: float = 0.0) -> float:
Expand Down Expand Up @@ -76,11 +76,11 @@ async def query_model_vllm(prompt, verbose, ip_ports):
prompt, prompt_len, expected_response_len = prompt

# Round-Robin dispatch request to the given api servers.
global server_num_request
server_id = min(server_num_request, key=server_num_request.get)
server_num_request[server_id] += 1
global server_num_requests
server_id = min(server_num_requests, key=server_num_requests.get)
server_num_requests[server_id] += 1
timeout = aiohttp.ClientTimeout(total=4*60*60)
global num_finished_request
global num_finished_requests

async with aiohttp.ClientSession(timeout=timeout) as session:
# TODO(yiwang): Remove hard codes of params.
Expand Down Expand Up @@ -111,8 +111,8 @@ async def query_model_vllm(prompt, verbose, ip_ports):
output['response_len'] = expected_response_len
if verbose and 'generated_text' in output:
print(json.dumps(output['generated_text']))
num_finished_request += 1
print("num_finised_request: {}".format(num_finished_request))
num_finished_requests += 1
print("num_finised_requests: {}".format(num_finished_requests))
return (prompt, output)
except aiohttp.ClientError as e:
print(f"Connect to {ip_ports[server_id]} failed with: {str(e)}")
Expand Down Expand Up @@ -334,18 +334,18 @@ def plot_instance(log_filename_0):
log_files.sort(key=os.path.getmtime, reverse=True)
df_0 = pd.read_csv(log_files[0]).sort_values(by=["timestamp"])
timestamp_list_0 = df_0["timestamp"].to_numpy()
instance_num_list_0 = df_0["num_instance"].to_numpy()
num_instances_list_0 = df_0["num_instances"].to_numpy()
time_0 = 0
sum_0 = 0
for idx, t in enumerate(timestamp_list_0):
if t > time_0:
time_0 += 1
sum_0 += instance_num_list_0[idx]
sum_0 += num_instances_list_0[idx]
print(f"{sum_0/time_0} gpu/s")
avg_instance_num = np.round(sum_0/time_0, 2)

fig, ax = plt.subplots()
ax.plot(timestamp_list_0, instance_num_list_0, color="red", label=f"instance_num(avg {avg_instance_num} /s)")
ax.plot(timestamp_list_0, num_instances_list_0, color="red", label=f"instance_num(avg {avg_instance_num} /s)")
ax.legend(loc='upper left')
fig_filename = os.path.splitext(log_filename_0)[0] + "_instance.png"
index1 = fig_filename.rfind('/')
Expand Down Expand Up @@ -437,10 +437,10 @@ async def benchmark(
else:
raise ValueError(f'unknown backend {backend}')

global server_num_request
num_server = len(ip_ports)
for server_id in range(num_server):
server_num_request[server_id] = 0
global server_num_requests
num_servers = len(ip_ports)
for server_id in range(num_servers):
server_num_requests[server_id] = 0

m = MeasureLatency()

Expand Down
20 changes: 12 additions & 8 deletions docs/Arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ Note: since Llumnix is still in alpha stage, the interface and arguments are *su

```
usage: -m llumnix.entrypoints.vllm.api_server [-h]
[--fixed-node-init]
[--fixed-node-init-instance]
[--init-instance-by-manager]
[--initial-instances INITIAL_INSTANCES]
[--load-metric {consumed_speed,used_ratio}]
[--load-metric {remaining_steps,usage_ratio}]
[--polling-interval POLLING_INTERVAL]
[--dispatch-policy {balanced,load,queue}]
[--enable-migration]
[--pair-migration-frequency PAIR_MIGRATION_FREQUENCY]
[--pair-migration-policy {balanced,prefill_constrained,prefill_relaxed}]
[--pair-migration-policy {balanced,defrag_constrained,defrag_relaxed}]
[--migrate-out-threshold MIGRATE_OUT_THRESHOLD]
[--request-migration-policy {LCFS,SJF,LJF}]
[--enable-defrag ENABLE_DEFRAG]
Expand All @@ -35,17 +36,20 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]
[--max-stages MAX_STAGES]
```

`--fixed-node-init`
`--fixed-node-init-instance`
- Fix the placement of instance to current node.

`--init-instance-by-manager`
- initialize instance by manager.

`--initial-instances`
- Number of model instances created at initialization.
- Default: 1

`--load-metric`
- Instance load metric.
- Possible choices: consumed_speed, used_ratio
- Default: "consumed_speed"
- Possible choices: remaining_steps, usage_ratio
- Default: "remaining_steps"

`--polling-interval`
- Time interval(s) to update instance info and pair migration.
Expand Down Expand Up @@ -135,11 +139,11 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]
- Default: 512

`--last-stage-max-blocks`
- If the remaining blocks num < last_stage_max_blocks, do last stage migration.
- If the number of remaining blocks < last_stage_max_blocks, do last stage migration.
- Default: 4

`--max-stages`
- Drop migration if stage num > max_stages.
- Drop migration if the number of stages > max_stages.
- Default: 3

# Unsupported vLLM feature options
Expand Down
24 changes: 14 additions & 10 deletions llumnix/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,19 @@
@dataclass
class EngineManagerArgs:
launch_ray_cluster: bool = True
init_instance_by_manager: bool = True
initial_instances: int = 1
fixed_node_init: bool = False
fixed_node_init_instance: bool = False

load_metric: str = 'consumed_speed'
load_metric: str = 'remaining_steps'
polling_interval: float = 0.05

dispatch_policy: str = 'load'

enable_migration: bool = True
enable_defrag: bool = True
pair_migration_frequency: int = 1
pair_migration_policy: str = 'prefill_constrained'
pair_migration_policy: str = 'defrag_constrained'
migrate_out_threshold: float = 3.0
request_migration_policy: str = 'SJF'

Expand Down Expand Up @@ -86,15 +87,18 @@ def from_cli_args(cls, args: argparse.Namespace) -> 'EngineManagerArgs':
# Get the list of attributes of this dataclass.
attrs = [attr.name for attr in dataclasses.fields(cls)]
# Set the attributes from the parsed arguments.
engine_args = cls(**{attr: getattr(args, attr) for attr in attrs})
return engine_args
engine_manager_args = cls(**{attr: getattr(args, attr) for attr in attrs})
return engine_manager_args

@staticmethod
def add_cli_args(
parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
parser.add_argument('--fixed-node-init',
parser.add_argument('--fixed-node-init-instance',
action='store_true',
help='fix the placement of instance to current node')
parser.add_argument('--init-instance-by-manager',
action='store_true',
help='initialize instance by manager')
parser.add_argument('--initial-instances',
type=int,
default=EngineManagerArgs.initial_instances,
Expand All @@ -103,7 +107,7 @@ def add_cli_args(
parser.add_argument('--load-metric',
type=str,
default=EngineManagerArgs.load_metric,
choices=['consumed_speed', 'used_ratio'],
choices=['remaining_steps', 'usage_ratio'],
help='instance load metric')
parser.add_argument('--polling-interval',
type=float,
Expand All @@ -126,7 +130,7 @@ def add_cli_args(
parser.add_argument('--pair-migration-policy',
type=str,
default=EngineManagerArgs.pair_migration_policy,
choices=['balanced', 'prefill_constrained', 'prefill_relaxed'],
choices=['balanced', 'defrag_constrained', 'defrag_relaxed'],
help='pair migration policy')
parser.add_argument('--migrate-out-threshold',
type=float,
Expand Down Expand Up @@ -203,10 +207,10 @@ def add_cli_args(
parser.add_argument('--last-stage-max-blocks',
type=int,
default=EngineManagerArgs.last_stage_max_blocks,
help='if the remain blocks num < last_stage_max_blocks, do last stage migration')
help='if the number pf remain blocks < last_stage_max_blocks, do last stage migration')
parser.add_argument('--max-stages',
type=int,
default=EngineManagerArgs.max_stages,
help='drop migration if stage num > max_stages')
help='drop migration if the number of stages > max_stages')

return parser
6 changes: 4 additions & 2 deletions llumnix/backends/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def init_backend_engine(instance_id: str, backend_type: BackendType, *args, **kw
def initialize_cluster(
world_size: int = 1,
ray_address: Optional[str] = None,
detached: bool = False,
) -> Tuple[str, Optional["PlacementGroup"]]:
"""Initialize the distributed cluster probably with Ray.

Expand All @@ -55,8 +56,9 @@ def initialize_cluster(
"Ray is not installed. Please install Ray to use distributed "
"serving.")
# Connect to a ray cluster.
ray.init(address=ray_address, ignore_reinit_error=True)
ray.init(address=ray_address, ignore_reinit_error=True, namespace='llumnix')

lifetime = "detached" if detached else None
# Create placement group for worker processes
current_placement_group = ray.util.get_current_placement_group()
if current_placement_group:
Expand Down Expand Up @@ -84,7 +86,7 @@ def initialize_cluster(
# Create a new placement group
placement_group_specs = ([{"CPU": 1}] + [{"GPU": 1}] * world_size)
current_placement_group = ray.util.placement_group(
placement_group_specs, "STRICT_PACK")
placement_group_specs, "STRICT_PACK", lifetime=lifetime)
# Wait until PG is ready - this will block until all
# requested resources are available, and will timeout
# if they cannot be provisioned.
Expand Down
5 changes: 4 additions & 1 deletion llumnix/backends/vllm/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
logger = init_logger(__name__)

class LlumnixRayGPUExecutor(RayGPUExecutor):
node_id: str = None

def _init_workers_ray(self, placement_group: "PlacementGroup",
**ray_remote_kwargs):
self.last_inference_latency = 0
Expand All @@ -56,6 +58,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup",

# Create the workers.
driver_ip = get_ip()
node_id = self.node_id
for rank in range(self.parallel_config.world_size):
if placement_group:
bundle = placement_group.bundle_specs[rank+1]
Expand All @@ -67,7 +70,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup",
)
else:
scheduling_strategy = NodeAffinitySchedulingStrategy(
node_id=ray.get_runtime_context().get_node_id(),
node_id=node_id,
soft=False,
)
worker = ray.remote(
Expand Down
26 changes: 17 additions & 9 deletions llumnix/backends/vllm/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def from_engine_args(
usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
instance_id: str = None,
placement_group: Optional["PlacementGroup"] = None,
node_id: str = None,
latency_mem: Optional[LatencyMemData] = None
) -> "LLMEngineLlumnix":
"""Creates an LLM engine from the engine arguments."""
Expand All @@ -77,6 +78,9 @@ def from_engine_args(
executor_class = LlumnixRayGPUExecutor
else:
raise ValueError('unimplemented executor backend')
# TODO(s5u13b): Do not hack here.
# Hack to pass node_id to _init_workers_ray function.
executor_class.node_id = node_id
# Create the LLM engine.
engine = cls(
instance_id=instance_id,
Expand Down Expand Up @@ -117,9 +121,9 @@ def step(self) -> None:
instance_info: InstanceInfo = self.scheduler.get_instance_info()

if self.scaling_down:
instance_info.num_running_request = 1
instance_info.num_available_gpu_block = -self.cache_config.num_gpu_blocks
instance_info.num_available_gpu_block_waiting = -self.cache_config.num_gpu_blocks
instance_info.num_running_requests = 1
instance_info.num_available_gpu_blocks = -self.cache_config.num_gpu_blocks
instance_info.num_available_gpu_blocks_waiting = -self.cache_config.num_gpu_blocks

instance_info.instance_id = self.instance_id
instance_info.step_id = next(self.step_counter)
Expand All @@ -132,7 +136,7 @@ def step(self) -> None:
blocks = self.scheduler.block_manager.get_block_table(seq)
tot_blocks.extend(blocks)
tot_blocks = set(tot_blocks)
instance_info.num_block_last_running_request = len(tot_blocks)
instance_info.num_blocks_last_running_request = len(tot_blocks)

self.free_request_states(instance_info.finished_request_ids)

Expand Down Expand Up @@ -177,11 +181,14 @@ def __init__(
instance_id: int,
migration_config: MigrationConfig,
engine_args: EngineArgs,
placement_group: "PlacementGroup"
placement_group: "PlacementGroup" = None,
node_id: str = None
) -> None:
assert migration_config.migration_backend == "rpc", "Gloo support will be released later."
self.engine: LLMEngineLlumnix = LLMEngineLlumnix.from_engine_args(engine_args=engine_args, instance_id=instance_id,
placement_group=placement_group)
self.engine: LLMEngineLlumnix = LLMEngineLlumnix.from_engine_args(engine_args=engine_args,
instance_id=instance_id,
placement_group=placement_group,
node_id=node_id)
# multi-instance args
self.engine.scheduler = SchedulerLlumnix(self.engine.scheduler_config, self.engine.cache_config, self.engine.lora_config)
self.engine.output_processor.scheduler = self.engine.scheduler
Expand All @@ -190,8 +197,9 @@ def __init__(
if len(self.worker_handle_list) + 1 == self.engine.parallel_config.world_size:
self.worker_handle_list.insert(0, ray.get_actor(f"instance_{self.instance_id}", namespace="llumnix"))
self._run_workers("init_migration", num_migration_cache_blocks=migration_config.migration_cache_blocks,\
src_worker_handle_list=self.worker_handle_list,
placement_group=placement_group)
src_worker_handle_list=self.worker_handle_list,
placement_group=placement_group,
node_id=node_id)
self._thread = threading.Thread(
target=self._start_engine_loop, args=(), daemon=True, name="engine_loop"
)
Expand Down
Loading
Loading