Skip to content
This repository has been archived by the owner on Oct 16, 2023. It is now read-only.

Commit

Permalink
Merge pull request #111 from ver217/example/opt
Browse files Browse the repository at this point in the history
add opt example
  • Loading branch information
dujiangsu authored Aug 19, 2022
2 parents 35d4fef + 55859c0 commit 0ddbb40
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 150 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,5 @@ dmypy.json

# Pyre type checker
.pyre/

.vscode/
10 changes: 5 additions & 5 deletions energonai/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from energonai.initialize import launch_from_multiprocess



logger = get_dist_logger('energonai')


Expand Down Expand Up @@ -61,7 +60,7 @@ def __init__(self,
# for TP, PP
self.rrefs = []
self.auto_pp = auto_pp

# for rpc
self.WORKER_NAME = "wok{}"
self._init_dist_rpc()
Expand All @@ -79,9 +78,10 @@ def _init_dist_rpc(self):
world_size=self.global_world_size,
host=self.host,
port=self.port)
rpc_backend_options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=16, rpc_timeout=6000)
# _transports=["uv"] TODO: potentially a bug

rpc_backend_options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=16,
rpc_timeout=6000
# _transports=["uv"] TODO: potentially a bug
)
rpc.init_rpc(self.WORKER_NAME.format(0),
rank=0,
world_size=self.global_world_size,
Expand Down
162 changes: 89 additions & 73 deletions energonai/model/model_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
from energonai.utils import is_using_pp, get_current_device
from energonai.logging import get_dist_logger


def gelu_impl(x):
"""OpenAI's gelu implementation."""
return 0.5 * x * (1.0 + torch.tanh(0.7978845608028654 * x *
(1.0 + 0.044715 * x * x)))


def select_top_k(predictions, k=5):
predicted_index = random.choice(predictions[0, -1, :].sort(descending=True)[1][:10]) #.item()
predicted_index = random.choice(predictions[0, -1, :].sort(descending=True)[1][:10]) # .item()
return predicted_index


class PipelineModel(nn.Module):
def __init__(self,
vocab_size: int = 50257,
Expand All @@ -37,14 +41,14 @@ def __init__(self,
padding_idx: int = 0,
dtype: dtype = torch.float16,
bias: bool = True,
apply_post_layernorm:bool = False,
apply_post_layernorm: bool = False,
first: bool = False,
last: bool = False,
fused_qkv:bool = True,
checkpoint:str = None,
model_name:str = None,
topk:int = 5,
is_decoder:bool = True) -> None:
fused_qkv: bool = True,
checkpoint: str = None,
model_name: str = None,
topk: int = 5,
is_decoder: bool = True) -> None:
super().__init__()

self.hidden_size = hidden_size
Expand All @@ -53,37 +57,37 @@ def __init__(self,
self.max_seq_len = max_seq_len
self.model_name = model_name
self.topk = topk

if first:
self.embed = Embedding1D(hidden_size=hidden_size,
vocab_size=vocab_size,
max_seq_len=max_seq_len,
num_tokentypes = num_tokentypes,
padding_idx=padding_idx,
dtype=dtype)
vocab_size=vocab_size,
max_seq_len=max_seq_len,
num_tokentypes=num_tokentypes,
padding_idx=padding_idx,
dtype=dtype)

self.blocks = nn.ModuleList()
self.pp_rank = gpc.get_local_rank(ParallelMode.PIPELINE) if is_using_pp() else 0
for id_ in range(depth):
self.blocks.add_module(f'{id_ + self.pp_rank * depth}',
Block1D(hidden_size=hidden_size,
num_heads=num_heads,
mlp_ratio=mlp_ratio,
activation=activation,
layernorm_epsilon=layernorm_epsilon,
dtype=dtype,
bias=bias,
apply_post_layernorm=apply_post_layernorm,
max_seq_len=max_seq_len,
fused_qkv=fused_qkv,
is_decoder=is_decoder))
Block1D(hidden_size=hidden_size,
num_heads=num_heads,
mlp_ratio=mlp_ratio,
activation=activation,
layernorm_epsilon=layernorm_epsilon,
dtype=dtype,
bias=bias,
apply_post_layernorm=apply_post_layernorm,
max_seq_len=max_seq_len,
fused_qkv=fused_qkv,
is_decoder=is_decoder))
if last:
self.norm = LayerNorm1D(normalized_shape=hidden_size, eps=layernorm_epsilon)
self.norm = LayerNorm1D(normalized_shape=hidden_size, eps=layernorm_epsilon)
self.head = LMHead1D(hidden_size=hidden_size, vocab_size=vocab_size, bias=False, dtype=dtype)

def forward(self, hidden_states=None, input_ids=None, attention_mask=None, seq_lens=None):
batch_size = input_ids.shape[0]
batch_size = input_ids.shape[0]

if self.first:
hidden_states = self.embed(input_ids)

Expand All @@ -94,20 +98,21 @@ def forward(self, hidden_states=None, input_ids=None, attention_mask=None, seq_l
attention_mask = (1.0 - attention_mask) * -10000.0

for block in self.blocks:
hidden_states = block(hidden_states, attention_mask) # seq_lens
hidden_states = block(hidden_states, attention_mask) # seq_lens

if self.last:
hidden_states = self.head(self.norm(hidden_states))
hidden_states = select_top_k(hidden_states, k=self.topk)

return hidden_states


def partition_uniform(num_items, pipeline_parallel_size):
logger = get_dist_logger()
assert num_items % pipeline_parallel_size == 0, \
"Layer length should be divided by the number of pipeline size, otherwise parameter method is recomended"

parts = [[] for _ in range(pipeline_parallel_size)]
parts = [[] for _ in range(pipeline_parallel_size)]

base_idx = 0
chunk_size = num_items // pipeline_parallel_size
Expand All @@ -122,7 +127,7 @@ def partition_uniform(num_items, pipeline_parallel_size):
return parts


def create_pipeline_model(depth:int = 48,
def create_pipeline_model(depth: int = 48,
layer_partitions=None,
**model_kwargs):
logger = get_dist_logger()
Expand Down Expand Up @@ -150,105 +155,116 @@ def create_pipeline_model(depth:int = 48,
numel = 0
for _, param in model.named_parameters(recurse=True):
numel += param.numel()
logger.info(f'Rank{rank}/{pipeline_rank} model size = {numel * 2 / 1e9} GB')
logger.info(f'Rank{rank}/{pipeline_rank} model size = {numel * 2 / 1e9} GB!!!')

if "checkpoint" in model_kwargs.keys() and "model_name" in model_kwargs.keys():
start = time.time()
assert os.path.exists(model_kwargs["checkpoint"]), "Checkpoint file not found"
if model_kwargs["model_name"] == "hf_gpt2":
from energonai.utils.checkpointing_hf_gpt2 import load_checkpoint
load_checkpoint(model_kwargs["checkpoint"], model, **model_kwargs)
if model_kwargs["model_name"] == "opt":
from energonai.utils.checkpointing_opt import load_checkpoint
load_checkpoint(model_kwargs["checkpoint"], model, **model_kwargs)
logger.info(f'Load time: {time.time() - start:.3f} s')

return model


def hf_gpt2(**kwargs):
model_kwargs = dict(hidden_size=768,
depth=12,
max_seq_len = 1024,
num_heads=12,
fused_qkv=False,
model_name = "hf_gpt2",
is_decoder = True,
model_kwargs = dict(hidden_size=768,
depth=12,
max_seq_len=1024,
num_heads=12,
fused_qkv=False,
model_name="hf_gpt2",
is_decoder=True,
**kwargs)
return create_pipeline_model(**model_kwargs)


def gpt2_small(**kwargs):
model_kwargs = dict(hidden_size=768, depth=12, num_heads=12, is_decoder = True, **kwargs)
model_kwargs = dict(hidden_size=768, depth=12, num_heads=12, is_decoder=True, **kwargs)
return create_pipeline_model(**model_kwargs)


def gpt2_large(**kwargs):
model_kwargs = dict(hidden_size=1536, depth=36, num_heads=12, is_decoder = True, **kwargs)
model_kwargs = dict(hidden_size=1536, depth=36, num_heads=12, is_decoder=True, **kwargs)
return create_pipeline_model(**model_kwargs)


def gpt2_8B(**kwargs):
model_kwargs = dict(hidden_size=3072, depth=72, num_heads=24, is_decoder = True, **kwargs)
model_kwargs = dict(hidden_size=3072, depth=72, num_heads=24, is_decoder=True, **kwargs)
return create_pipeline_model(**model_kwargs)


def gpt3(**kwargs):
model_kwargs = dict(hidden_size=12288, depth=12, num_heads=96, is_decoder = True, **kwargs)
model_kwargs = dict(hidden_size=12288, depth=12, num_heads=96, is_decoder=True, **kwargs)
return create_pipeline_model(**model_kwargs)


def bert_small(**kwargs):
model_kwargs = dict(hidden_size=768, depth=12, num_heads=12, is_decoder = False, **kwargs)
model_kwargs = dict(hidden_size=768, depth=12, num_heads=12, is_decoder=False, **kwargs)
return create_pipeline_model(**model_kwargs)


def bert_large(**kwargs):
model_kwargs = dict(hidden_size=1024, depth=24, num_heads=16, is_decoder = False, **kwargs)
model_kwargs = dict(hidden_size=1024, depth=24, num_heads=16, is_decoder=False, **kwargs)
return create_pipeline_model(**model_kwargs)


def bert_8B(**kwargs):
model_kwargs = dict(hidden_size=3072, depth=72, num_heads=24, is_decoder = False, **kwargs)
model_kwargs = dict(hidden_size=3072, depth=72, num_heads=24, is_decoder=False, **kwargs)
return create_pipeline_model(**model_kwargs)


def bert_175B(**kwargs):
model_kwargs = dict(hidden_size=12288, depth=96, num_heads=96, is_decoder = False, **kwargs)
model_kwargs = dict(hidden_size=12288, depth=96, num_heads=96, is_decoder=False, **kwargs)
return create_pipeline_model(**model_kwargs)


def opt_125M(**kwargs):
model_kwargs = dict(vocab_size=50272,
hidden_size=768,
depth=12,
hidden_size=768,
depth=12,
max_seq_len=2050,
num_heads=12,
activation=nn.functional.relu,
is_decoder = True,
fused_qkv=False,
model_name = "opt",
num_heads=12,
activation=nn.functional.relu,
is_decoder=True,
fused_qkv=False,
model_name="opt",
**kwargs)
return create_pipeline_model(**model_kwargs)


def opt_30B(**kwargs):
model_kwargs = dict(vocab_size=50272,
hidden_size=7168,
depth=48,
hidden_size=7168,
depth=48,
max_seq_len=2050,
num_heads=56,
activation=nn.functional.relu,
is_decoder = True,
fused_qkv=False,
model_name = "opt",
num_heads=56,
activation=nn.functional.relu,
is_decoder=True,
fused_qkv=False,
model_name="opt",
**kwargs)
return create_pipeline_model(**model_kwargs)


def opt_66B(**kwargs):
model_kwargs = dict(vocab_size=50272,
hidden_size=9216,
depth=64,
hidden_size=9216,
depth=64,
max_seq_len=2050,
num_heads=72,
activation=nn.functional.relu,
is_decoder = True,
fused_qkv=False,
model_name = "opt",
num_heads=72,
activation=nn.functional.relu,
is_decoder=True,
fused_qkv=False,
model_name="opt",
**kwargs)
return create_pipeline_model(**model_kwargs)



# def opt_175B(**kwargs):
# model_kwargs = dict(hidden_size=12288, depth=96, num_heads=96, activation=nn.functional.relu, is_decoder = True, **kwargs)
# return create_pipeline_model(**model_kwargs)
# return create_pipeline_model(**model_kwargs)
18 changes: 9 additions & 9 deletions energonai/server/worker_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@

logger = get_dist_logger('energonai')

app = FastAPI()
app = FastAPI()


@app.get("/")
def root():
return {"200"}



@app.get("/shutdown")
async def shutdown():
rpc.shutdown()
Expand All @@ -22,21 +23,20 @@ async def shutdown():
await server.shutdown()


def launch_worker(config_file,
def launch_worker(config_file,
rank=0,
local_rank=0,
server_host="127.0.0.1",
server_port=8005):
mcfg.load_config(config_file)

world_size = mcfg['tp_init_size'] * mcfg['pp_init_size']

launch_from_multiprocess(mcfg['tp_init_size'], mcfg['pp_init_size'], mcfg['backend'],
mcfg['seed'], mcfg['verbose'], rank, local_rank, world_size,
mcfg['host'], mcfg['port'])
launch_from_multiprocess(mcfg['tp_init_size'], mcfg['pp_init_size'], mcfg['backend'],
mcfg['seed'], mcfg['verbose'], rank, local_rank, world_size,
mcfg['host'], mcfg['port'])


WORKER_NAME = "wok{}"
WORKER_NAME = "wok{}"
# _transports=["uv"] TODO: potentially a bug
rpc_backend_options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=16, rpc_timeout=6000)
rpc.init_rpc(WORKER_NAME.format(rank), rank=rank, world_size=world_size, rpc_backend_options=rpc_backend_options)
Expand Down
Loading

0 comments on commit 0ddbb40

Please sign in to comment.