Skip to content

Commit

Permalink
[workflow] fixed build CI (hpcaitech#5240)
Browse files Browse the repository at this point in the history
* [workflow] fixed build CI

* polish

* polish

* polish

* polish

* polish
  • Loading branch information
FrankLeeeee authored and flybird11111 committed Jan 29, 2024
1 parent 1149884 commit e5a33da
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 250 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_on_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,4 @@ jobs:
uses: actions/upload-artifact@v3
with:
name: report
path: report/
path: report/
2 changes: 1 addition & 1 deletion .github/workflows/build_on_schedule.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,4 @@ jobs:
SERVER_URL: ${{github.server_url }}
REPO: ${{ github.repository }}
RUN_ID: ${{ github.run_id }}
WEBHOOK_URL: ${{ secrets.LARK_NOTIFICATION_WEBHOOK_URL }}
WEBHOOK_URL: ${{ secrets.LARK_NOTIFICATION_WEBHOOK_URL }}
2 changes: 1 addition & 1 deletion tests/kit/model_zoo/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,4 @@ def get_sub_registry(
return new_dict


model_zoo = ModelZooRegistry()
model_zoo = ModelZooRegistry()
2 changes: 1 addition & 1 deletion tests/test_booster/test_plugin/test_gemini_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,4 @@ def test_gemini_plugin_3d(early_stop: bool = True):


if __name__ == "__main__":
test_gemini_plugin(early_stop=False)
test_gemini_plugin(early_stop=False)
163 changes: 71 additions & 92 deletions tests/test_booster/test_plugin/test_low_level_zero_plugin.py
Original file line number Diff line number Diff line change
@@ -1,106 +1,85 @@
from typing import Optional

import torch
import torch.distributed as dist
from torch.optim import Adam
from torchvision.models import resnet18
from utils import shared_tempdir

import colossalai
from colossalai.accelerator import get_accelerator
from colossalai.booster import Booster
from colossalai.booster.plugin import LowLevelZeroPlugin

# from colossalai.nn.optimizer import HybridAdam
from colossalai.testing import clear_cache_before_run, parameterize, rerun_if_address_is_in_use, spawn
from tests.kit.model_zoo import COMMON_MODELS, IS_FAST_TEST, model_zoo

# These models are not compatible with AMP
_AMP_ERR_MODELS = ["timm_convit", "deepfm_interactionarch"]
# These models have no parameters
_LOW_LEVEL_ZERO_ERR_MODELS = ["dlrm_interactionarch"]
# These models will cause stuck, to be fixed
_STUCK_MODELS = ["transformers_albert_for_multiple_choice"]


from colossalai.nn.optimizer import HybridAdam
from colossalai.testing import (
check_state_dict_equal,
clear_cache_before_run,
parameterize,
rerun_if_address_is_in_use,
spawn,
)
from colossalai.zero import LowLevelZeroOptimizer


# stage 1 and 2 process the optimizer/mode the same way
# only test 2 is fine
@clear_cache_before_run()
def run_fn(stage, model_fn, data_gen_fn, output_transform_fn) -> Optional[str]:
device = get_accelerator().get_current_device()
try:
plugin = LowLevelZeroPlugin(stage=stage, max_norm=1.0, initial_scale=2**5)
booster = Booster(plugin=plugin)
model = model_fn()
optimizer = Adam(model.parameters(), lr=1e-3)
criterion = lambda x: x.mean()
data = data_gen_fn()

data = {
k: v.to(device) if torch.is_tensor(v) or "Tensor" in v.__class__.__name__ else v for k, v in data.items()
}

model, optimizer, criterion, _, _ = booster.boost(model, optimizer, criterion)

output = model(**data)
output = output_transform_fn(output)
output_key = list(output.keys())[0]
loss = criterion(output[output_key])

booster.backward(loss, optimizer)
optimizer.step()

except Exception as e:
return repr(e)


@parameterize("stage", [2])
def check_low_level_zero_plugin(stage: int, early_stop: bool = True):
"""check low level zero plugin over model zoo
Args:
stage (int), stage of low level zero plugin
early_stop (bool, optional): Whether to stop when getting the first error. Defaults to True.
"""
passed_models = []
failed_info = {} # (model_name, error) pair
ignore_models = _AMP_ERR_MODELS + _LOW_LEVEL_ZERO_ERR_MODELS + _STUCK_MODELS
skipped_models = []

if IS_FAST_TEST:
registry = model_zoo.get_sub_registry(COMMON_MODELS)
else:
registry = model_zoo

for name, (model_fn, data_gen_fn, output_transform_fn, _, _) in registry.items():
# FIXME(ver217): fix these models
if name in ignore_models:
skipped_models.append(name)
continue
err = run_fn(stage, model_fn, data_gen_fn, output_transform_fn)

get_accelerator().empty_cache()

if err is None:
passed_models.append(name)
else:
failed_info[name] = err
if early_stop:
break

if dist.get_rank() == 0:
print(f"Passed models({len(passed_models)}): {passed_models}\n\n")
print(f"Failed models({len(failed_info)}): {list(failed_info.keys())}\n\n")
print(f"Skipped models({len(skipped_models)}): {skipped_models}\n\n")
assert len(failed_info) == 0, "\n".join([f"{k}: {v}" for k, v in failed_info.items()])


def run_dist(rank, world_size, port, early_stop: bool = True):
# init dist env
colossalai.launch(config=dict(), rank=rank, world_size=world_size, port=port, host="localhost")
check_low_level_zero_plugin(early_stop=early_stop)
@parameterize("shard", [True, False])
@parameterize("offload", [False, True])
def check_low_level_zero_checkpointIO(stage: int, shard: bool, offload: bool):
plugin = LowLevelZeroPlugin(stage=stage, max_norm=1.0, initial_scale=32, cpu_offload=offload)
booster = Booster(plugin=plugin)
model = resnet18()
criterion = lambda x: x.mean()
optimizer = HybridAdam((model.parameters()), lr=0.001)
model, optimizer, criterion, _, _ = booster.boost(model, optimizer, criterion)

x = torch.randn(1, 3, 224, 224, device="cuda")
output = model(x)
loss = criterion(output)
booster.backward(loss, optimizer)
optimizer.step()
with shared_tempdir() as tempdir:
model_ckpt_path = f"{tempdir}/model"
optimizer_ckpt_path = f"{tempdir}/optimizer"
# lr scheduler is tested in test_torch_ddp_checkpoint_io.py and low level zero does not change it, we can skip it here
booster.save_model(model, model_ckpt_path, shard=shard)
booster.save_optimizer(optimizer, optimizer_ckpt_path, shard=shard)

dist.barrier()

new_model = resnet18()
new_optimizer = HybridAdam((new_model.parameters()), lr=0.001)
new_model, new_optimizer, _, _, _ = booster.boost(new_model, new_optimizer)

booster.load_model(new_model, model_ckpt_path)
check_state_dict_equal(model.state_dict(), new_model.state_dict(), False)
# check master weight
assert isinstance(new_optimizer, LowLevelZeroOptimizer)
working_param_id_set = set(id(p) for p in new_model.parameters())
for p_id, master_param in new_optimizer._param_store.working_to_master_param.items():
assert p_id in working_param_id_set
working_param = new_optimizer._param_store.master_to_working_param[id(master_param)]
padding = new_optimizer._param_store.get_param_padding_size(working_param)
padded_param = torch.nn.functional.pad(working_param.data.view(-1), (0, padding))
working_shard = padded_param.chunk(dist.get_world_size())[dist.get_rank()]
assert torch.equal(
working_shard, master_param.data.view(-1).to(dtype=padded_param.dtype, device=padded_param.device)
)

booster.load_optimizer(new_optimizer, optimizer_ckpt_path)
check_state_dict_equal(optimizer.optim.state_dict(), new_optimizer.optim.state_dict(), False)
torch.cuda.empty_cache()


def run_dist(rank, world_size, port):
colossalai.launch(config=(dict()), rank=rank, world_size=world_size, port=port, host="localhost")
check_low_level_zero_checkpointIO()
torch.cuda.empty_cache()


@rerun_if_address_is_in_use()
def test_low_level_zero_plugin(early_stop: bool = True):
spawn(run_dist, 2, early_stop=early_stop)
@clear_cache_before_run()
def test_low_level_zero_checkpointIO():
spawn(run_dist, 2)


if __name__ == "__main__":
test_low_level_zero_plugin(early_stop=False)
test_low_level_zero_checkpointIO()
127 changes: 39 additions & 88 deletions tests/test_booster/test_plugin/test_torch_ddp_plugin.py
Original file line number Diff line number Diff line change
@@ -1,119 +1,70 @@
from contextlib import nullcontext

import torch
import torch.distributed as dist
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.optim import SGD
from torchvision.models import resnet18
from utils import shared_tempdir

import colossalai
from colossalai.booster import Booster
from colossalai.booster.plugin import TorchDDPPlugin
from colossalai.interface import OptimizerWrapper
from colossalai.testing import clear_cache_before_run, rerun_if_address_is_in_use, spawn
from tests.kit.model_zoo import COMMON_MODELS, IS_FAST_TEST, model_zoo
from colossalai.testing import check_state_dict_equal, parameterize, rerun_if_address_is_in_use, spawn


@clear_cache_before_run()
def run_fn(model_fn, data_gen_fn, output_transform_fn):
@parameterize("shard", [True, False])
@parameterize("size_per_shard", [16, 128])
def check_torch_ddp_checkpointIO(shard: bool, size_per_shard: int):
plugin = TorchDDPPlugin()
booster = Booster(plugin=plugin)
model = model_fn()
optimizer = SGD(model.parameters(), lr=1e-3)
model = resnet18()
criterion = lambda x: x.mean()
data = data_gen_fn()

data = {k: v.to("cuda") if torch.is_tensor(v) or "Tensor" in v.__class__.__name__ else v for k, v in data.items()}

model, optimizer, criterion, _, _ = booster.boost(model, optimizer, criterion)
optimizer = SGD((model.parameters()), lr=0.001)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.1)
model, optimizer, criterion, _, _ = booster.boost(model, optimizer, criterion, lr_scheduler=scheduler)

assert isinstance(model.module, DDP)
assert isinstance(optimizer, OptimizerWrapper)

output = model(**data)
output = output_transform_fn(output)
output_key = list(output.keys())[0]
loss = criterion(output[output_key])

x = torch.randn(4, 3, 224, 224)
x = x.to("cuda")
output = model(x)
loss = criterion(output)
booster.backward(loss, optimizer)
optimizer.clip_grad_by_norm(1.0)
optimizer.step()
scheduler.step()

with shared_tempdir() as tempdir:
model_ckpt_path = f"{tempdir}/model"
optimizer_ckpt_path = f"{tempdir}/optimizer"
lr_scheduler_ckpt_path = f"{tempdir}/lr_scheduler"
booster.save_model(model, model_ckpt_path, shard=shard, size_per_shard=size_per_shard)
booster.save_optimizer(optimizer, optimizer_ckpt_path, shard=shard, size_per_shard=size_per_shard)
booster.save_lr_scheduler(scheduler, lr_scheduler_ckpt_path)
dist.barrier()

def check_torch_ddp_plugin():
if IS_FAST_TEST:
registry = model_zoo.get_sub_registry(COMMON_MODELS)
else:
registry = model_zoo

for name, (model_fn, data_gen_fn, output_transform_fn, _, _) in registry.items():
if name == "dlrm_interactionarch":
continue
run_fn(model_fn, data_gen_fn, output_transform_fn)
torch.cuda.empty_cache()


class DummyModel(nn.Module):
def __init__(self):
super().__init__()
self.weight = nn.Parameter(torch.rand(1))

def forward(self, x):
return self.weight * x


def check_torch_ddp_no_sync():
plugin = TorchDDPPlugin()
booster = Booster(plugin=plugin)

model = DummyModel()
criterion = lambda x: x.mean()
optimizer = SGD(model.parameters(), lr=1e-3)
# create a custom dataset with 0 to 10
dataset = torch.arange(0, 10)
train_dataloader = plugin.prepare_dataloader(dataset, batch_size=2)
model, optimizer, criterion, train_dataloader, _ = booster.boost(
model, optimizer, criterion, dataloader=train_dataloader
)

def fwd_bwd():
output = model(batch.cuda())
loss = criterion(output)
booster.backward(loss, optimizer)
new_model = resnet18()
new_optimizer = SGD((new_model.parameters()), lr=0.001)
new_scheduler = torch.optim.lr_scheduler.StepLR(new_optimizer, step_size=1, gamma=0.1)
new_model, new_optimizer, _, _, new_scheduler = booster.boost(
new_model, new_optimizer, lr_scheduler=new_scheduler
)

def get_grad_set_over_all_ranks():
for p in model.parameters():
# grad shape is (1, )
assert p.grad.shape == (1,)
grad_list = [torch.empty_like(p.grad) for _ in range(dist.get_world_size())]
dist.all_gather(grad_list, p.grad)
# get grad set of all ranks
grad_set = set([grad.item() for grad in grad_list])
# as the model only has one parameter, we can return here
return grad_set
booster.load_model(new_model, model_ckpt_path)
check_state_dict_equal(model.state_dict(), new_model.state_dict(), False)

for i, batch in enumerate(train_dataloader):
if i > 1:
# only check the first two batches
break
# no_sync for the first batch, sync for the second batch
ctx = booster.no_sync(model) if i == 0 else nullcontext()
with ctx:
fwd_bwd()
grad_set = get_grad_set_over_all_ranks()
# for the first batch, all ranks should have different grads
# for the second batch, as grad is synchronized,all ranks should have the same grads
target_num_different_grad = dist.get_world_size() if i == 0 else 1
assert len(grad_set) == target_num_different_grad
booster.load_optimizer(new_optimizer, optimizer_ckpt_path)
check_state_dict_equal(optimizer.state_dict(), new_optimizer.state_dict(), False)
booster.load_lr_scheduler(new_scheduler, lr_scheduler_ckpt_path)
check_state_dict_equal(scheduler.state_dict(), new_scheduler.state_dict(), False)


def run_dist(rank, world_size, port):
# init dist env
colossalai.launch(config=dict(), rank=rank, world_size=world_size, port=port, host="localhost")
check_torch_ddp_plugin()
check_torch_ddp_no_sync()
colossalai.launch(config=(dict()), rank=rank, world_size=world_size, port=port, host="localhost")
check_torch_ddp_checkpointIO()


@rerun_if_address_is_in_use()
def test_torch_ddp_plugin():
spawn(run_dist, 2)
def test_torch_ddp_checkpointIO():
spawn(run_dist, 2)
Loading

0 comments on commit e5a33da

Please sign in to comment.