Skip to content
This repository has been archived by the owner on Oct 16, 2023. It is now read-only.

add opt example #111

Merged
merged 1 commit into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,5 @@ dmypy.json

# Pyre type checker
.pyre/

.vscode/
10 changes: 5 additions & 5 deletions energonai/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from energonai.initialize import launch_from_multiprocess



logger = get_dist_logger('energonai')


Expand Down Expand Up @@ -61,7 +60,7 @@ def __init__(self,
# for TP, PP
self.rrefs = []
self.auto_pp = auto_pp

# for rpc
self.WORKER_NAME = "wok{}"
self._init_dist_rpc()
Expand All @@ -79,9 +78,10 @@ def _init_dist_rpc(self):
world_size=self.global_world_size,
host=self.host,
port=self.port)
rpc_backend_options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=16, rpc_timeout=6000)
# _transports=["uv"] TODO: potentially a bug

rpc_backend_options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=16,
rpc_timeout=6000
# _transports=["uv"] TODO: potentially a bug
)
rpc.init_rpc(self.WORKER_NAME.format(0),
rank=0,
world_size=self.global_world_size,
Expand Down
162 changes: 89 additions & 73 deletions energonai/model/model_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
from energonai.utils import is_using_pp, get_current_device
from energonai.logging import get_dist_logger


def gelu_impl(x):
"""OpenAI's gelu implementation."""
return 0.5 * x * (1.0 + torch.tanh(0.7978845608028654 * x *
(1.0 + 0.044715 * x * x)))


def select_top_k(predictions, k=5):
predicted_index = random.choice(predictions[0, -1, :].sort(descending=True)[1][:10]) #.item()
predicted_index = random.choice(predictions[0, -1, :].sort(descending=True)[1][:10]) # .item()
return predicted_index


class PipelineModel(nn.Module):
def __init__(self,
vocab_size: int = 50257,
Expand All @@ -37,14 +41,14 @@ def __init__(self,
padding_idx: int = 0,
dtype: dtype = torch.float16,
bias: bool = True,
apply_post_layernorm:bool = False,
apply_post_layernorm: bool = False,
first: bool = False,
last: bool = False,
fused_qkv:bool = True,
checkpoint:str = None,
model_name:str = None,
topk:int = 5,
is_decoder:bool = True) -> None:
fused_qkv: bool = True,
checkpoint: str = None,
model_name: str = None,
topk: int = 5,
is_decoder: bool = True) -> None:
super().__init__()

self.hidden_size = hidden_size
Expand All @@ -53,37 +57,37 @@ def __init__(self,
self.max_seq_len = max_seq_len
self.model_name = model_name
self.topk = topk

if first:
self.embed = Embedding1D(hidden_size=hidden_size,
vocab_size=vocab_size,
max_seq_len=max_seq_len,
num_tokentypes = num_tokentypes,
padding_idx=padding_idx,
dtype=dtype)
vocab_size=vocab_size,
max_seq_len=max_seq_len,
num_tokentypes=num_tokentypes,
padding_idx=padding_idx,
dtype=dtype)

self.blocks = nn.ModuleList()
self.pp_rank = gpc.get_local_rank(ParallelMode.PIPELINE) if is_using_pp() else 0
for id_ in range(depth):
self.blocks.add_module(f'{id_ + self.pp_rank * depth}',
Block1D(hidden_size=hidden_size,
num_heads=num_heads,
mlp_ratio=mlp_ratio,
activation=activation,
layernorm_epsilon=layernorm_epsilon,
dtype=dtype,
bias=bias,
apply_post_layernorm=apply_post_layernorm,
max_seq_len=max_seq_len,
fused_qkv=fused_qkv,
is_decoder=is_decoder))
Block1D(hidden_size=hidden_size,
num_heads=num_heads,
mlp_ratio=mlp_ratio,
activation=activation,
layernorm_epsilon=layernorm_epsilon,
dtype=dtype,
bias=bias,
apply_post_layernorm=apply_post_layernorm,
max_seq_len=max_seq_len,
fused_qkv=fused_qkv,
is_decoder=is_decoder))
if last:
self.norm = LayerNorm1D(normalized_shape=hidden_size, eps=layernorm_epsilon)
self.norm = LayerNorm1D(normalized_shape=hidden_size, eps=layernorm_epsilon)
self.head = LMHead1D(hidden_size=hidden_size, vocab_size=vocab_size, bias=False, dtype=dtype)

def forward(self, hidden_states=None, input_ids=None, attention_mask=None, seq_lens=None):
batch_size = input_ids.shape[0]
batch_size = input_ids.shape[0]

if self.first:
hidden_states = self.embed(input_ids)

Expand All @@ -94,20 +98,21 @@ def forward(self, hidden_states=None, input_ids=None, attention_mask=None, seq_l
attention_mask = (1.0 - attention_mask) * -10000.0

for block in self.blocks:
hidden_states = block(hidden_states, attention_mask) # seq_lens
hidden_states = block(hidden_states, attention_mask) # seq_lens

if self.last:
hidden_states = self.head(self.norm(hidden_states))
hidden_states = select_top_k(hidden_states, k=self.topk)

return hidden_states


def partition_uniform(num_items, pipeline_parallel_size):
logger = get_dist_logger()
assert num_items % pipeline_parallel_size == 0, \
"Layer length should be divided by the number of pipeline size, otherwise parameter method is recomended"

parts = [[] for _ in range(pipeline_parallel_size)]
parts = [[] for _ in range(pipeline_parallel_size)]

base_idx = 0
chunk_size = num_items // pipeline_parallel_size
Expand All @@ -122,7 +127,7 @@ def partition_uniform(num_items, pipeline_parallel_size):
return parts


def create_pipeline_model(depth:int = 48,
def create_pipeline_model(depth: int = 48,
layer_partitions=None,
**model_kwargs):
logger = get_dist_logger()
Expand Down Expand Up @@ -150,105 +155,116 @@ def create_pipeline_model(depth:int = 48,
numel = 0
for _, param in model.named_parameters(recurse=True):
numel += param.numel()
logger.info(f'Rank{rank}/{pipeline_rank} model size = {numel * 2 / 1e9} GB')
logger.info(f'Rank{rank}/{pipeline_rank} model size = {numel * 2 / 1e9} GB!!!')

if "checkpoint" in model_kwargs.keys() and "model_name" in model_kwargs.keys():
start = time.time()
assert os.path.exists(model_kwargs["checkpoint"]), "Checkpoint file not found"
if model_kwargs["model_name"] == "hf_gpt2":
from energonai.utils.checkpointing_hf_gpt2 import load_checkpoint
load_checkpoint(model_kwargs["checkpoint"], model, **model_kwargs)
if model_kwargs["model_name"] == "opt":
from energonai.utils.checkpointing_opt import load_checkpoint
load_checkpoint(model_kwargs["checkpoint"], model, **model_kwargs)
logger.info(f'Load time: {time.time() - start:.3f} s')

return model


def hf_gpt2(**kwargs):
model_kwargs = dict(hidden_size=768,
depth=12,
max_seq_len = 1024,
num_heads=12,
fused_qkv=False,
model_name = "hf_gpt2",
is_decoder = True,
model_kwargs = dict(hidden_size=768,
depth=12,
max_seq_len=1024,
num_heads=12,
fused_qkv=False,
model_name="hf_gpt2",
is_decoder=True,
**kwargs)
return create_pipeline_model(**model_kwargs)


def gpt2_small(**kwargs):
model_kwargs = dict(hidden_size=768, depth=12, num_heads=12, is_decoder = True, **kwargs)
model_kwargs = dict(hidden_size=768, depth=12, num_heads=12, is_decoder=True, **kwargs)
return create_pipeline_model(**model_kwargs)


def gpt2_large(**kwargs):
model_kwargs = dict(hidden_size=1536, depth=36, num_heads=12, is_decoder = True, **kwargs)
model_kwargs = dict(hidden_size=1536, depth=36, num_heads=12, is_decoder=True, **kwargs)
return create_pipeline_model(**model_kwargs)


def gpt2_8B(**kwargs):
model_kwargs = dict(hidden_size=3072, depth=72, num_heads=24, is_decoder = True, **kwargs)
model_kwargs = dict(hidden_size=3072, depth=72, num_heads=24, is_decoder=True, **kwargs)
return create_pipeline_model(**model_kwargs)


def gpt3(**kwargs):
model_kwargs = dict(hidden_size=12288, depth=12, num_heads=96, is_decoder = True, **kwargs)
model_kwargs = dict(hidden_size=12288, depth=12, num_heads=96, is_decoder=True, **kwargs)
return create_pipeline_model(**model_kwargs)


def bert_small(**kwargs):
model_kwargs = dict(hidden_size=768, depth=12, num_heads=12, is_decoder = False, **kwargs)
model_kwargs = dict(hidden_size=768, depth=12, num_heads=12, is_decoder=False, **kwargs)
return create_pipeline_model(**model_kwargs)


def bert_large(**kwargs):
model_kwargs = dict(hidden_size=1024, depth=24, num_heads=16, is_decoder = False, **kwargs)
model_kwargs = dict(hidden_size=1024, depth=24, num_heads=16, is_decoder=False, **kwargs)
return create_pipeline_model(**model_kwargs)


def bert_8B(**kwargs):
model_kwargs = dict(hidden_size=3072, depth=72, num_heads=24, is_decoder = False, **kwargs)
model_kwargs = dict(hidden_size=3072, depth=72, num_heads=24, is_decoder=False, **kwargs)
return create_pipeline_model(**model_kwargs)


def bert_175B(**kwargs):
model_kwargs = dict(hidden_size=12288, depth=96, num_heads=96, is_decoder = False, **kwargs)
model_kwargs = dict(hidden_size=12288, depth=96, num_heads=96, is_decoder=False, **kwargs)
return create_pipeline_model(**model_kwargs)


def opt_125M(**kwargs):
model_kwargs = dict(vocab_size=50272,
hidden_size=768,
depth=12,
hidden_size=768,
depth=12,
max_seq_len=2050,
num_heads=12,
activation=nn.functional.relu,
is_decoder = True,
fused_qkv=False,
model_name = "opt",
num_heads=12,
activation=nn.functional.relu,
is_decoder=True,
fused_qkv=False,
model_name="opt",
**kwargs)
return create_pipeline_model(**model_kwargs)


def opt_30B(**kwargs):
model_kwargs = dict(vocab_size=50272,
hidden_size=7168,
depth=48,
hidden_size=7168,
depth=48,
max_seq_len=2050,
num_heads=56,
activation=nn.functional.relu,
is_decoder = True,
fused_qkv=False,
model_name = "opt",
num_heads=56,
activation=nn.functional.relu,
is_decoder=True,
fused_qkv=False,
model_name="opt",
**kwargs)
return create_pipeline_model(**model_kwargs)


def opt_66B(**kwargs):
model_kwargs = dict(vocab_size=50272,
hidden_size=9216,
depth=64,
hidden_size=9216,
depth=64,
max_seq_len=2050,
num_heads=72,
activation=nn.functional.relu,
is_decoder = True,
fused_qkv=False,
model_name = "opt",
num_heads=72,
activation=nn.functional.relu,
is_decoder=True,
fused_qkv=False,
model_name="opt",
**kwargs)
return create_pipeline_model(**model_kwargs)



# def opt_175B(**kwargs):
# model_kwargs = dict(hidden_size=12288, depth=96, num_heads=96, activation=nn.functional.relu, is_decoder = True, **kwargs)
# return create_pipeline_model(**model_kwargs)
# return create_pipeline_model(**model_kwargs)
18 changes: 9 additions & 9 deletions energonai/server/worker_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@

logger = get_dist_logger('energonai')

app = FastAPI()
app = FastAPI()


@app.get("/")
def root():
return {"200"}



@app.get("/shutdown")
async def shutdown():
rpc.shutdown()
Expand All @@ -22,21 +23,20 @@ async def shutdown():
await server.shutdown()


def launch_worker(config_file,
def launch_worker(config_file,
rank=0,
local_rank=0,
server_host="127.0.0.1",
server_port=8005):
mcfg.load_config(config_file)

world_size = mcfg['tp_init_size'] * mcfg['pp_init_size']

launch_from_multiprocess(mcfg['tp_init_size'], mcfg['pp_init_size'], mcfg['backend'],
mcfg['seed'], mcfg['verbose'], rank, local_rank, world_size,
mcfg['host'], mcfg['port'])
launch_from_multiprocess(mcfg['tp_init_size'], mcfg['pp_init_size'], mcfg['backend'],
mcfg['seed'], mcfg['verbose'], rank, local_rank, world_size,
mcfg['host'], mcfg['port'])


WORKER_NAME = "wok{}"
WORKER_NAME = "wok{}"
# _transports=["uv"] TODO: potentially a bug
rpc_backend_options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=16, rpc_timeout=6000)
rpc.init_rpc(WORKER_NAME.format(rank), rank=rank, world_size=world_size, rpc_backend_options=rpc_backend_options)
Expand Down
Loading