Skip to content
This repository has been archived by the owner on Oct 16, 2023. It is now read-only.

triton run #14

Merged
merged 1 commit into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion energon/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .engine import InferenceEngine, launch_rpc
from .engine import InferenceEngine
110 changes: 68 additions & 42 deletions energon/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,51 +18,40 @@
from energon.logging import get_dist_logger
from energon.nn import PipelineCommWrapper


def process_func(tp_size: int = 1,
pp_size:int = 1,
backend: str = 'nccl',
seed: int = 1024,
verbose: bool = True,
rank: int = 0,
local_rank: int = 0,
world_size:int = 1,
host: str = 'localhost',
port: int = 29500):

os.environ['MASTER_ADDR'] = host
os.environ['MASTER_PORT'] = f'{port}'
# from httpx import AsyncClient

# async def arouseRPC(servehost: str,
# serveport: int,
# tp_size: int,
# pp_size: int,
# backend: str,
# seed: int,
# verbose: bool,
# rank: int,
# local_rank: int,
# host: str,
# port: int):
# url = f'http://{servehost}:{serveport}/start/{tp_size}?pp_size={pp_size}&backend={backend}&seed={seed}&verbose={verbose}&rank={rank}&local_rank={local_rank}&host={host}&port={port}'
# print(url)
# async with AsyncClient(app = ap)

launch_from_multiprocess(tp_size, pp_size, backend, seed, verbose, rank, local_rank, world_size, host, port)
WORKER_NAME = "wok{}"
rpc.init_rpc(WORKER_NAME.format(rank), rank=rank, world_size=world_size)
rpc.shutdown()

def launch_rpc(tp_size: int = 1,
pp_size:int = 1,
backend: str = 'nccl',
seed: int = 1024,
verbose: bool = True,
host: str = 'localhost',
port: int = 29500):

world_size = pp_size * tp_size

processes = []
for rank in range(world_size-1):
p = mp.Process(target=process_func, args=(tp_size, pp_size, backend, seed, verbose, rank+1, rank+1, world_size, host, port))
p.start()
processes.append(p)
# dd = httpx.get(url)
# print(f'{dd.status_code}')

return processes
# def shutdownRPC(servehost: str,
# serveport: int):
# url = f'http://{servehost}:{serveport}/stop'
# print(url)
# dd = httpx.get(url)
# print(f'{dd.status_code}')




class InferenceEngine(Module):
def __init__(self,
model_class,
model_config,
model_config,
max_batch_size: int = 1,
tp_init_size: int = -1,
pp_init_size: int = -1,
Expand Down Expand Up @@ -95,19 +84,20 @@ def __init__(self,
self.pp_size = pp_init_size

# for TP
self.rrefs = []

self.rrefs = []
# for rpc
self.WORKER_NAME = "wok{}"

self.WORKER_NAME = "wok{}"
self._init_dist_rpc()
self._init_model()

def _init_dist_rpc(self):
r'''
Based on global_context, init the rpc connection.
'''
self.processes = launch_rpc(tp_size = self.tp_size, pp_size = self.pp_size, backend = 'nccl', seed = 1024, verbose = True, host = self.host, port = self.port)
# self.processes = launch_rpc(tp_size = self.tp_size, pp_size = self.pp_size, backend = 'nccl', seed = 1024, verbose = True, host = self.host, port = self.port)
# arouseRPC(servehost = self.host, serveport = 8005, tp_size = self.tp_size, pp_size = self.pp_size, backend = 'nccl', seed = 1024, verbose = True,
# rank = 1, local_rank = 1, host = self.host, port = self.port)

os.environ['MASTER_ADDR'] = self.host
os.environ['MASTER_PORT'] = f'{self.port}'
launch_from_multiprocess(tp_size = self.tp_size, pp_size = self.pp_size, rank = self.rank, local_rank = self.rank, world_size = self.global_world_size, host = self.host, port = self.port)
Expand Down Expand Up @@ -137,6 +127,42 @@ def clear(self):
p.join()



# def process_func(tp_size: int = 1,
# pp_size:int = 1,
# backend: str = 'nccl',
# seed: int = 1024,
# verbose: bool = True,
# rank: int = 0,
# local_rank: int = 0,
# world_size:int = 1,
# host: str = 'localhost',
# port: int = 29500):

# os.environ['MASTER_ADDR'] = host
# os.environ['MASTER_PORT'] = f'{port}'

# launch_from_multiprocess(tp_size, pp_size, backend, seed, verbose, rank, local_rank, world_size, host, port)
# WORKER_NAME = "wok{}"
# rpc.init_rpc(WORKER_NAME.format(rank), rank=rank, world_size=world_size)
# rpc.shutdown()

# def launch_rpc(tp_size: int = 1,
# pp_size:int = 1,
# backend: str = 'nccl',
# seed: int = 1024,
# verbose: bool = True,
# host: str = 'localhost',
# port: int = 29500):

# world_size = pp_size * tp_size

# processes = []
# mp.set_start_method('spawn')
# for rank in range(world_size-1):
# p = mp.Process(target=process_func, args=(tp_size, pp_size, backend, seed, verbose, rank+1, rank+1, world_size, host, port))
# p.start()
# processes.append(p)

# return processes


8 changes: 4 additions & 4 deletions energon/engine/rpc_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import torch
import inspect
import torch.distributed.rpc as rpc

import sys
from energon.core import global_context as gpc
from energon.context import ParallelMode
from .rpc_utils import remote_cls_method, sync_cls_method, async_cls_method
Expand All @@ -26,17 +26,17 @@ def __init__(self,
self.rank = gpc.get_local_rank(ParallelMode.GLOBAL)

torch.cuda.set_device(f'cuda:{gpc.get_local_rank(ParallelMode.GLOBAL)}')

self._init_self()

def _init_self(self):
print("[INFO] init model in rank {}".format(self.rank))

# print(self.model_class)

if self.dtype == torch.half:
self.model = self.model_class(**self.model_config).cuda().half()
else:
self.model = self.model_class(**self.model_config).cuda()

# print("Pass")
self.model.eval()
self.model = GPTPipelineCommWrapper(model = self.model, max_batch_size = self.max_batch_size, dtype=self.dtype)

Expand Down
30 changes: 30 additions & 0 deletions energon/engine/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os
from fastapi import FastAPI
import torch.distributed.rpc as rpc
from energon.initialize import launch_from_multiprocess

app = FastAPI() # 创建 api 对象


@app.get("/") # 根路由
def root():
return {"200"}

@app.get("/start/{tp_size}")
def init(tp_size: int, pp_size: int, backend: str, seed: int, verbose: bool, rank: int, local_rank: int, host: str, port: int):
# http://127.0.0.1:8005/start/1?pp_size=1&backend=nccl&seed=1024&verbose=true&rank=0&local_rank=0&host=localhost&port=29500
# http://127.0.0.1:8005/start/1?pp_size=1&backend=nccl&seed=1024&verbose=true&rank=0&local_rank=0&host=localhost&port=29500
world_size = tp_size * pp_size

os.environ['MASTER_ADDR'] = host
os.environ['MASTER_PORT'] = f'{port}'
launch_from_multiprocess(tp_size, pp_size, backend, seed, verbose, rank, local_rank, world_size, host, port)
WORKER_NAME = "wok{}"
rpc.init_rpc(WORKER_NAME.format(rank), rank=rank, world_size=world_size)

return {"Start!"}

@app.get("/stop")
def shutDown():
rpc.shutdown()
return {"Stop!"}
12 changes: 6 additions & 6 deletions energon/registry/__init__.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import torch.distributed.optim as dist_optim
import torch.nn as nn
import torch.optim as optim
import torchvision.models as tv_models
import torchvision.datasets as tv_datasets
from torchvision import transforms
# import torchvision.models as tv_models
# import torchvision.datasets as tv_datasets
# from torchvision import transforms

from .registry import Registry

LAYERS = Registry('layers', third_party_library=[nn])
LOSSES = Registry('losses')
MODELS = Registry('models', third_party_library=[tv_models])
# MODELS = Registry('models', third_party_library=[tv_models])
OPTIMIZERS = Registry('optimizers', third_party_library=[optim, dist_optim])
DATASETS = Registry('datasets', third_party_library=[tv_datasets])
# DATASETS = Registry('datasets', third_party_library=[tv_datasets])
DIST_GROUP_INITIALIZER = Registry('dist_group_initializer')
GRADIENT_HANDLER = Registry('gradient_handler')
LOSSES = Registry('losses', third_party_library=[nn])
HOOKS = Registry('hooks')
TRANSFORMS = Registry('transforms', third_party_library=[transforms])
# TRANSFORMS = Registry('transforms', third_party_library=[transforms])
DATA_SAMPLERS = Registry('data_samplers')
LR_SCHEDULERS = Registry('lr_schedulers')
SCHEDULE = Registry('schedules')
10 changes: 6 additions & 4 deletions model/gpt/evaluate.py → examples/evaluate.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import time
import torch
import argparse

from model.pipeline_gpt1d import GPT2_small_pipeline_1D, GPT2_exlarge_pipeline_1D, GPT3_pipeline_1D
# from model
from pipeline_gpt1d import GPT2_small_pipeline_1D, GPT2_exlarge_pipeline_1D, GPT3_pipeline_1D
from energon.engine import InferenceEngine
from energon.logging import get_dist_logger
from energon.core import global_context as gpc
from energon.context import ParallelMode
from energon.utils import get_timers




MODEL_CLASSES = {
"gpt2_small": GPT2_small_pipeline_1D,
"gpt2_exlarge": GPT2_exlarge_pipeline_1D,
Expand Down Expand Up @@ -40,13 +42,13 @@ def main():

# print(MODEL_CLASSES[args.model_name])

engine = InferenceEngine(MODEL_CLASSES[args.model_name], model_config, max_batch_size = 32, tp_init_size = args.tensor_para_size, pp_init_size = args.pipe_para_size, dtype = torch.half)
engine = InferenceEngine(MODEL_CLASSES[args.model_name], model_config, max_batch_size = 32, tp_init_size = args.tensor_para_size, pp_init_size = args.pipe_para_size, dtype = torch.half)

output = engine.run(sample)

print(output.to_here())
engine.clear()

# from model.pipeline_gpt1d import GPT2_small_pipeline_1D, GPT2_exlarge_pipeline_1D, GPT3_pipeline_1D
# prof = torch.profiler.profile(
# schedule=torch.profiler.schedule(wait=1,
# warmup=1,
Expand Down
File renamed without changes.
34 changes: 34 additions & 0 deletions examples/run_gpt.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/bin/bash

tp_size=2
pp_size=2
model=gpt2_exlarge
world_size=`expr $tp_size \* $pp_size`
server_port_start=8005
host="localhost"
port=29500

export PYTHONPATH=/opt/tritonserver/host/ColossalAI-Inference/examples

for ((i=1; i<${world_size}; i++))
do
server_port=`expr $server_port_start + $i`
uvicorn server:app --app-dir /opt/tritonserver/host/ColossalAI-Inference/energon/engine/ --port ${server_port} &
echo "process: ${i} launches"
done

sleep 3

for ((i=1; i<${world_size}; i++))
do
server_port=`expr $server_port_start + $i`
curl -X 'GET' \
"http://127.0.0.1:${server_port}/start/${tp_size}?pp_size=${pp_size}&backend=nccl&seed=1024&verbose=true&rank=${i}&local_rank=${i}&host=${host}&port=${port}" \
-H 'accept: application/json' &
echo "http://127.0.0.1:${server_port}/start/${tp_size}?pp_size=${pp_size}&backend=nccl&seed=1024&verbose=true&rank=${i}&local_rank=${i}&host=${host}&port=${port}"
echo "evoke process: ${i} init rpc"
done

# python3 evaluate.py --fp16 --model_name=${model} --tensor_para_size=${tp_size} --pipe_para_size=${pp_size}
tritonserver --model-repository /opt/tritonserver/host/python_backend/models

Loading