Skip to content

Commit

Permalink
Merge branch 'main' into misc
Browse files Browse the repository at this point in the history
  • Loading branch information
ZeldaHuang committed Aug 28, 2024
2 parents be3d9e6 + 56c0c3c commit 51924a1
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 26 deletions.
2 changes: 1 addition & 1 deletion benchmark/benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async def query_model_vllm(prompt, verbose, ip_ports):
global num_finished_requests

async with aiohttp.ClientSession(timeout=timeout) as session:
# TODO(yiwang): Remove hard codes of params.
# TODO(s5u13b): Remove hard codes of params.
best_of = 1
use_beam_search = False
output_len = expected_response_len
Expand Down
19 changes: 11 additions & 8 deletions docs/Arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,17 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]
[--migration-num-layers MIGRATION_NUM_LAYERS]
[--last-stage-max-blocks LAST_STAGE_MAX_BLOCKS]
[--max-stages MAX_STAGES]
```

`--disable-fixed-node-init-instance`
- Disable fixing the instance's placement to the current node.
- Disable fixing the placement of instance to current node.

`--disable-init-instance-by-manager`
- Disable the initialization of instance by the manager.
- Disable initializing instance by manager.

`--initial-instances`
- Number of model instances created at initialization.
- Number of instances created at initialization.
- Default: 1

`--load-metric`
Expand All @@ -56,7 +57,7 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]

`--polling-interval`
- Time interval(s) to update instance info and pair migration.
- Default: 0.1
- Default: 0.05

`--dispatch-policy`
- Request dispatch policy.
Expand All @@ -72,6 +73,8 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]

`--pair-migration-policy`
- Pair migration policy.
- Possible choices: balanced, defrag_constrained, defrag_relaxed
- Default: "defrag_constrained"

`--migrate-out-threshold`
- Migrate out instance load threshold.
Expand All @@ -83,7 +86,7 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]
- Default: "SJF"

`--enable-defrag`
- Enable defragmentation.
- Enable defragmentation through migration based on virtual usage.
- Default: False

`--enable-scaling`
Expand All @@ -104,15 +107,15 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]
`--scaling-policy`
- Scaling policy.
- Possible choices: max_load, avg_load
- default: "max_load"
- default: "avg_load"

`--scale-up-threshold`
- Scale up threshold.
- Default: 4
- Default: 10

`--scale-down-threshold`
- Scale down threshold.
- Default: 100
- Default: 60

`--disable-log-requests-manager`
- Disable logging requests in manager.
Expand Down
12 changes: 6 additions & 6 deletions llumnix/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class EngineManagerArgs:
gpu_type: str = "a10"
migration_backend_init_timeout: float = 10.0
migration_backend: str = "rpc"
migration_cache_blocks: int = 32
migration_cache_blocks: int = 512
migration_num_layers: int = 1
last_stage_max_blocks: int = 16
max_stages: int = 3
Expand Down Expand Up @@ -99,11 +99,11 @@ def add_cli_args(
help='disable fixing the placement of instance to current node')
parser.add_argument('--disable-init-instance-by-manager',
action='store_true',
help='disable the initialization of the instance by the manager')
help='disable initializing instance by manager')
parser.add_argument('--initial-instances',
type=int,
default=EngineManagerArgs.initial_instances,
help='number of model instances created at initialzation')
help='number of instances created at initialzation')

parser.add_argument('--load-metric',
type=str,
Expand Down Expand Up @@ -145,7 +145,7 @@ def add_cli_args(
parser.add_argument('--enable-defrag',
type=bool,
default=EngineManagerArgs.enable_defrag,
help='enable defragmentation')
help='enable defragmentation through migration based on virtual usage')

parser.add_argument('--enable-scaling',
action='store_true',
Expand Down Expand Up @@ -200,15 +200,15 @@ def add_cli_args(
type=str,
default=EngineManagerArgs.migration_backend,
choices=['gloo','nccl','rpc'],
help='communication backend during migration')
help='communication backend of migration')
parser.add_argument('--migration-backend-init-timeout',
type=float,
default=EngineManagerArgs.migration_backend_init_timeout,
help='timeout(s) for initializing migration backend')
parser.add_argument('--migration-cache-blocks',
type=int,
default=EngineManagerArgs.migration_cache_blocks,
help='cache blocks num during migration')
help='number of cache blocks in migration')
parser.add_argument('--migration-num-layers',
type=int,
default=EngineManagerArgs.migration_num_layers,
Expand Down
2 changes: 1 addition & 1 deletion llumnix/backends/vllm/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ def from_engine_args(
executor_class.migration_config = migration_config
else:
raise ValueError('unimplemented executor backend')
# TODO(s5u13b): Do not hack here.
# Hack to pass node_id to _init_workers_ray function.
executor_class.node_id = node_id
# Create the LLM engine.
Expand Down Expand Up @@ -231,6 +230,7 @@ def __init__(
node_id=node_id)
# multi-instance args
self.engine.scheduler = SchedulerLlumnix(self.engine.scheduler_config, self.engine.cache_config, self.engine.lora_config)
self.engine.scheduler.add_update_instance_info_callback(self.engine.update_instance_info)
self.engine.output_processor.scheduler = self.engine.scheduler
self.instance_id = instance_id
self.worker_handle_list = self.engine.model_executor.workers.copy()
Expand Down
11 changes: 7 additions & 4 deletions llumnix/backends/vllm/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ def __init__(self, *args, **kwargs) -> None:
self.scheduler_lock = threading.Lock()
self.migrating_out_request_last_stage: List[SequenceGroupLlumnix] = []

def add_update_instance_info_callback(self, update_instance_info_callback):
self.update_instance_info_callback = update_instance_info_callback
self.update_instance_info_callback(self._get_instance_info())

def _preempt(
self,
seq_group: SequenceGroupLlumnix,
Expand Down Expand Up @@ -175,8 +179,7 @@ def free_src_request(self, backend_request: SequenceGroup) -> None:
logger.info("free seq {}".format(seq.seq_id))
self.free_seq(seq)

@scheduler_lock
def get_instance_info(self) -> InstanceInfo:
def _get_instance_info(self) -> InstanceInfo:
num_total_gpu_blocks = self.cache_config.num_gpu_blocks
num_free_gpu_blocks = self.block_manager.get_num_free_gpu_blocks()
num_used_gpu_blocks = num_total_gpu_blocks - num_free_gpu_blocks
Expand All @@ -200,8 +203,8 @@ def get_instance_info(self) -> InstanceInfo:
instance_info = InstanceInfo(
num_total_gpu_blocks=num_total_gpu_blocks,
num_watermark_blocks=self.block_manager.watermark_blocks,
num_free_gpu_blocks=num_free_gpu_blocks,
num_used_gpu_blocks=num_used_gpu_blocks,
num_free_gpu_blocks=num_free_gpu_blocks,
gpu_cache_usage=gpu_cache_usage,
num_running_requests=len(self.running),
num_waiting_requests=len(self.waiting),
Expand All @@ -212,7 +215,7 @@ def get_instance_info(self) -> InstanceInfo:
)
for seq_group in self.running:
instance_info.running_seq_lens.extend([seq.get_len() for seq in seq_group.get_seqs()])
instance_info.num_seq = len(instance_info.running_seq_lens)
instance_info.num_seqs = len(instance_info.running_seq_lens)
if self.running:
instance_info.inference_type = self.running[-1].inference_type
instance_info.num_batched_tokens = sum([seq_group.request_len for seq_group in self.running])\
Expand Down
11 changes: 5 additions & 6 deletions llumnix/llm_engine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
CLEARING_INTERVAL = 3600
RETRIES_INTERVALS = 5.0

# TODO(yiwang): add unit test for CI
# TODO(yiwang): Fix the logger when manager failover.
# TODO(s5u13b): Fix the logger when manager failover.


class LLMEngineManager:
Expand Down Expand Up @@ -67,7 +66,7 @@ def __init__(self,
logger.info("num_instances: {}".format(self.num_instances))
logger.info("max_instances: {}, min_instances: {}".format(self.max_instances, self.min_instances))

# TODO(yiwang): refactor auto-scaling
# TODO(s5u13b): refactor auto-scaling

self.instances: Dict[str, Llumlet] = {}
self.instance_migrating: Dict[str, bool] = {}
Expand Down Expand Up @@ -252,8 +251,7 @@ async def _migrate(self) -> None:
call_migrate_instance_pairs.append(migrate_instance_pair)
task = self.instances[migrate_out_instance_id].migrate_out.remote(migrate_in_instance_name)
migration_tasks.append(task)
# TODO(yiwang): It's not necessary for manager to await for each migration.
# TODO(yiwang): Migration failover could be implemented in Llumlet rather than manager.
# TODO(s5u13b): Migration failover could be implemented in Llumlet rather than manager.
rets = await asyncio.gather(*migration_tasks, return_exceptions=True)
await self._post_migrate(rets, call_migrate_instance_pairs)
# pylint: disable=W0703
Expand Down Expand Up @@ -426,7 +424,8 @@ def from_args(cls,
logger.info("engine_manager_args: {}".format(engine_manager_args))
return engine_manager

# TODO(s5u13b): significant duplication with llumlet_utils.init_llumlets. consider reducing duplicate codes.
# TODO(s5u13b): Significant duplication with llumlet_utils.init_llumlets. Consider reducing duplicate codes.
# TODO(s5u13b): Fix the logger when enabling init instance by manager.
def init_llumlets(self,
engine_args,
node_id: str) -> Tuple[List[str], List[Llumlet]]:
Expand Down

0 comments on commit 51924a1

Please sign in to comment.