Skip to content

Commit

Permalink
[Misc] Update instance info of engine in a timely manner (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
s5u13b committed Aug 27, 2024
1 parent 70b8c9a commit 56c0c3c
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 27 deletions.
2 changes: 1 addition & 1 deletion benchmark/benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async def query_model_vllm(prompt, verbose, ip_ports):
global num_finished_requests

async with aiohttp.ClientSession(timeout=timeout) as session:
# TODO(yiwang): Remove hard codes of params.
# TODO(s5u13b): Remove hard codes of params.
best_of = 1
use_beam_search = False
output_len = expected_response_len
Expand Down
19 changes: 11 additions & 8 deletions docs/Arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,17 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]
[--migration-num-layers MIGRATION_NUM_LAYERS]
[--last-stage-max-blocks LAST_STAGE_MAX_BLOCKS]
[--max-stages MAX_STAGES]
```

`--disable-fixed-node-init-instance`
- Disable fixing the instance's placement to the current node.
- Disable fixing the placement of instance to current node.

`--disable-init-instance-by-manager`
- Disable the initialization of instance by the manager.
- Disable initializing instance by manager.

`--initial-instances`
- Number of model instances created at initialization.
- Number of instances created at initialization.
- Default: 1

`--load-metric`
Expand All @@ -56,7 +57,7 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]

`--polling-interval`
- Time interval(s) to update instance info and pair migration.
- Default: 0.1
- Default: 0.05

`--dispatch-policy`
- Request dispatch policy.
Expand All @@ -72,6 +73,8 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]

`--pair-migration-policy`
- Pair migration policy.
- Possible choices: balanced, defrag_constrained, defrag_relaxed
- Default: "defrag_constrained"

`--migrate-out-threshold`
- Migrate out instance load threshold.
Expand All @@ -83,7 +86,7 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]
- Default: "SJF"

`--enable-defrag`
- Enable defragmentation.
- Enable defragmentation through migration based on virtual usage.
- Default: False

`--enable-scaling`
Expand All @@ -104,15 +107,15 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]
`--scaling-policy`
- Scaling policy.
- Possible choices: max_load, avg_load
- default: "max_load"
- default: "avg_load"

`--scale-up-threshold`
- Scale up threshold.
- Default: 4
- Default: 10

`--scale-down-threshold`
- Scale down threshold.
- Default: 100
- Default: 60

`--disable-log-requests-manager`
- Disable logging requests in manager.
Expand Down
12 changes: 6 additions & 6 deletions llumnix/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class EngineManagerArgs:
gpu_type: str = "a10"
migration_backend_init_timeout: float = 10.0
migration_backend: str = "rpc"
migration_cache_blocks: int = 32
migration_cache_blocks: int = 512
migration_num_layers: int = 1
last_stage_max_blocks: int = 16
max_stages: int = 3
Expand Down Expand Up @@ -99,11 +99,11 @@ def add_cli_args(
help='disable fixing the placement of instance to current node')
parser.add_argument('--disable-init-instance-by-manager',
action='store_true',
help='disable the initialization of the instance by the manager')
help='disable initializing instance by manager')
parser.add_argument('--initial-instances',
type=int,
default=EngineManagerArgs.initial_instances,
help='number of model instances created at initialzation')
help='number of instances created at initialzation')

parser.add_argument('--load-metric',
type=str,
Expand Down Expand Up @@ -145,7 +145,7 @@ def add_cli_args(
parser.add_argument('--enable-defrag',
type=bool,
default=EngineManagerArgs.enable_defrag,
help='enable defragmentation')
help='enable defragmentation through migration based on virtual usage')

parser.add_argument('--enable-scaling',
action='store_true',
Expand Down Expand Up @@ -200,15 +200,15 @@ def add_cli_args(
type=str,
default=EngineManagerArgs.migration_backend,
choices=['gloo','nccl','rpc'],
help='communication backend during migration')
help='communication backend of migration')
parser.add_argument('--migration-backend-init-timeout',
type=float,
default=EngineManagerArgs.migration_backend_init_timeout,
help='timeout(s) for initializing migration backend')
parser.add_argument('--migration-cache-blocks',
type=int,
default=EngineManagerArgs.migration_cache_blocks,
help='cache blocks num during migration')
help='number of cache blocks in migration')
parser.add_argument('--migration-num-layers',
type=int,
default=EngineManagerArgs.migration_num_layers,
Expand Down
18 changes: 15 additions & 3 deletions llumnix/backends/vllm/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ def from_engine_args(
executor_class.migration_config = migration_config
else:
raise ValueError('unimplemented executor backend')
# TODO(s5u13b): Do not hack here.
# Hack to pass node_id to _init_workers_ray function.
executor_class.node_id = node_id
# Create the LLM engine.
Expand Down Expand Up @@ -119,7 +118,7 @@ def _process_model_outputs(
def step(self) -> None:
output_list = super().step()

instance_info: InstanceInfo = self.scheduler.get_instance_info()
instance_info: InstanceInfo = self.instance_info

if self.scaling_down:
instance_info.num_running_requests = 1
Expand Down Expand Up @@ -148,7 +147,19 @@ def step(self) -> None:
self._put_request_output_to_server(output_list, server_info_list)
self.instance_info = instance_info

def _put_request_output_to_server(self, request_outputs, server_infos: List[ServerInfo]) -> None:
def update_instance_info(self, instance_info: InstanceInfo) -> None:
# These fields are updated after step.
if self.instance_info is not None:
instance_info.instance_id = self.instance_info.instance_id
instance_info.step_id = self.instance_info.step_id
instance_info.timestamp = self.instance_info.timestamp
instance_info.latency = self.instance_info.latency
instance_info.num_blocks_last_running_request = self.instance_info.num_blocks_last_running_request
self.instance_info = instance_info

def _put_request_output_to_server(self,
request_outputs: List[RequestOutput],
server_infos: List[ServerInfo]) -> None:
server_request_outputs = defaultdict(list)
server_queue: Dict[str, RayQueue] = {}
# Reorganize data in orther to put request output to queue in batch at one time.
Expand Down Expand Up @@ -192,6 +203,7 @@ def __init__(
node_id=node_id)
# multi-instance args
self.engine.scheduler = SchedulerLlumnix(self.engine.scheduler_config, self.engine.cache_config, self.engine.lora_config)
self.engine.scheduler.add_update_instance_info_callback(self.engine.update_instance_info)
self.engine.output_processor.scheduler = self.engine.scheduler
self.instance_id = instance_id
self.worker_handle_list = self.engine.model_executor.workers.copy()
Expand Down
12 changes: 9 additions & 3 deletions llumnix/backends/vllm/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

logger = init_logger(__name__)


# TODO(ZeldaHuang): adapt prefix cache and sliding window, now use v1 manager
class BlockManagerLlumnix(BlockSpaceManagerV1):
def get_free_blocks(self, num_required_blocks: int) -> BlockTable:
Expand Down Expand Up @@ -62,6 +63,10 @@ def __init__(self, *args, **kwargs) -> None:
self.scheduler_lock = threading.Lock()
self.migrating_out_request_last_stage = []

def add_update_instance_info_callback(self, update_instance_info_callback):
self.update_instance_info_callback = update_instance_info_callback
self.update_instance_info_callback(self._get_instance_info())

def _preempt(
self,
seq_group: SequenceGroup,
Expand Down Expand Up @@ -185,8 +190,7 @@ def free_src_request(self, backend_request: SequenceGroup) -> None:
logger.info("free seq {}".format(seq.seq_id))
self.free_seq(seq)

@scheduler_lock
def get_instance_info(self) -> InstanceInfo:
def _get_instance_info(self) -> InstanceInfo:
num_total_gpu_blocks = self.cache_config.num_gpu_blocks
num_free_gpu_blocks = self.block_manager.get_num_free_gpu_blocks()
num_used_gpu_blocks = num_total_gpu_blocks - num_free_gpu_blocks
Expand All @@ -210,8 +214,8 @@ def get_instance_info(self) -> InstanceInfo:
instance_info = InstanceInfo(
num_total_gpu_blocks=num_total_gpu_blocks,
num_watermark_blocks=self.block_manager.watermark_blocks,
num_free_gpu_blocks=num_free_gpu_blocks,
num_used_gpu_blocks=num_used_gpu_blocks,
num_free_gpu_blocks=num_free_gpu_blocks,
gpu_cache_usage=gpu_cache_usage,
num_running_requests=len(self.running),
num_waiting_requests=len(self.waiting),
Expand All @@ -225,6 +229,7 @@ def get_instance_info(self) -> InstanceInfo:
for seq_group in self.running:
instance_info.running_seq_lens.extend([seq.get_len() for seq in seq_group.get_seqs()])
instance_info.num_seqs = len(instance_info.running_seq_lens)
# TODO(s5u13b): Only correct when using prefill preemption batching policy.
instance_info.num_batched_tokens = sum([seq_group.get_seqs()[0].get_len() for seq_group in self.prefilling_seq_groups])\
if self.prefilling_seq_groups else len(instance_info.running_seq_lens)
instance_info.finished_request_ids = [seq_group.request_id for seq_group in self.running if seq_group.is_finished()]
Expand All @@ -238,6 +243,7 @@ def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
for scheduled_seq_group in scheduler_outputs.scheduled_seq_groups:
if scheduled_seq_group.seq_group.is_prefill():
self.prefilling_seq_groups.append(scheduled_seq_group.seq_group)
self.update_instance_info_callback(self._get_instance_info())
return seq_group_metadata_list, scheduler_outputs

@scheduler_lock
Expand Down
11 changes: 5 additions & 6 deletions llumnix/llm_engine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
CLEARING_INTERVAL = 3600
RETRIES_INTERVALS = 5.0

# TODO(yiwang): add unit test for CI
# TODO(yiwang): Fix the logger when manager failover.
# TODO(s5u13b): Fix the logger when manager failover.


class LLMEngineManager:
Expand Down Expand Up @@ -67,7 +66,7 @@ def __init__(self,
logger.info("num_instances: {}".format(self.num_instances))
logger.info("max_instances: {}, min_instances: {}".format(self.max_instances, self.min_instances))

# TODO(yiwang): refactor auto-scaling
# TODO(s5u13b): refactor auto-scaling

self.instances: Dict[str, Llumlet] = {}
self.instance_migrating: Dict[str, bool] = {}
Expand Down Expand Up @@ -252,8 +251,7 @@ async def _migrate(self) -> None:
call_migrate_instance_pairs.append(migrate_instance_pair)
task = self.instances[migrate_out_instance_id].migrate_out.remote(migrate_in_instance_name)
migration_tasks.append(task)
# TODO(yiwang): It's not necessary for manager to await for each migration.
# TODO(yiwang): Migration failover could be implemented in Llumlet rather than manager.
# TODO(s5u13b): Migration failover could be implemented in Llumlet rather than manager.
rets = await asyncio.gather(*migration_tasks, return_exceptions=True)
await self._post_migrate(rets, call_migrate_instance_pairs)
# pylint: disable=W0703
Expand Down Expand Up @@ -426,7 +424,8 @@ def from_args(cls,
logger.info("engine_manager_args: {}".format(engine_manager_args))
return engine_manager

# TODO(s5u13b): significant duplication with llumlet_utils.init_llumlets. consider reducing duplicate codes.
# TODO(s5u13b): Significant duplication with llumlet_utils.init_llumlets. Consider reducing duplicate codes.
# TODO(s5u13b): Fix the logger when enabling init instance by manager.
def init_llumlets(self,
engine_args,
node_id: str) -> Tuple[List[str], List[Llumlet]]:
Expand Down

0 comments on commit 56c0c3c

Please sign in to comment.