diff --git a/energon/engine/engine.py b/energon/engine/engine.py index 8bedf4a..be56b2f 100644 --- a/energon/engine/engine.py +++ b/energon/engine/engine.py @@ -4,12 +4,13 @@ from torch.nn import Module import torch.multiprocessing as mp from functools import partial +import requests + # pytorch rpc import torch.distributed.rpc as rpc from .rpc_utils import remote_cls_method, sync_cls_method, async_cls_method from .rpc_worker import RPCWorker -# depend on colossalai from energon.core import global_context as gpc from energon.context import ParallelMode from energon.initialize import launch_from_torch, launch_from_multiprocess @@ -18,35 +19,6 @@ from energon.logging import get_dist_logger from energon.nn import PipelineCommWrapper -# from httpx import AsyncClient - -# async def arouseRPC(servehost: str, -# serveport: int, -# tp_size: int, -# pp_size: int, -# backend: str, -# seed: int, -# verbose: bool, -# rank: int, -# local_rank: int, -# host: str, -# port: int): -# url = f'http://{servehost}:{serveport}/start/{tp_size}?pp_size={pp_size}&backend={backend}&seed={seed}&verbose={verbose}&rank={rank}&local_rank={local_rank}&host={host}&port={port}' -# print(url) -# async with AsyncClient(app = ap) - -# dd = httpx.get(url) -# print(f'{dd.status_code}') - -# def shutdownRPC(servehost: str, -# serveport: int): -# url = f'http://{servehost}:{serveport}/stop' -# print(url) -# dd = httpx.get(url) -# print(f'{dd.status_code}') - - - class InferenceEngine(Module): def __init__(self, @@ -79,7 +51,6 @@ def __init__(self, self.global_world_size = pp_init_size * tp_init_size self.host = host self.port = port - self.processes = None self.tp_size = tp_init_size self.pp_size = pp_init_size @@ -94,10 +65,6 @@ def _init_dist_rpc(self): r''' Based on global_context, init the rpc connection. ''' - # self.processes = launch_rpc(tp_size = self.tp_size, pp_size = self.pp_size, backend = 'nccl', seed = 1024, verbose = True, host = self.host, port = self.port) - # arouseRPC(servehost = self.host, serveport = 8005, tp_size = self.tp_size, pp_size = self.pp_size, backend = 'nccl', seed = 1024, verbose = True, - # rank = 1, local_rank = 1, host = self.host, port = self.port) - os.environ['MASTER_ADDR'] = self.host os.environ['MASTER_PORT'] = f'{self.port}' launch_from_multiprocess(tp_size = self.tp_size, pp_size = self.pp_size, rank = self.rank, local_rank = self.rank, world_size = self.global_world_size, host = self.host, port = self.port) @@ -121,48 +88,8 @@ def run(self, inputs): def clear(self): + rpc.shutdown() - for p in self.processes: - p.join() -# def process_func(tp_size: int = 1, -# pp_size:int = 1, -# backend: str = 'nccl', -# seed: int = 1024, -# verbose: bool = True, -# rank: int = 0, -# local_rank: int = 0, -# world_size:int = 1, -# host: str = 'localhost', -# port: int = 29500): - -# os.environ['MASTER_ADDR'] = host -# os.environ['MASTER_PORT'] = f'{port}' - -# launch_from_multiprocess(tp_size, pp_size, backend, seed, verbose, rank, local_rank, world_size, host, port) -# WORKER_NAME = "wok{}" -# rpc.init_rpc(WORKER_NAME.format(rank), rank=rank, world_size=world_size) -# rpc.shutdown() - -# def launch_rpc(tp_size: int = 1, -# pp_size:int = 1, -# backend: str = 'nccl', -# seed: int = 1024, -# verbose: bool = True, -# host: str = 'localhost', -# port: int = 29500): - -# world_size = pp_size * tp_size - -# processes = [] -# mp.set_start_method('spawn') -# for rank in range(world_size-1): -# p = mp.Process(target=process_func, args=(tp_size, pp_size, backend, seed, verbose, rank+1, rank+1, world_size, host, port)) -# p.start() -# processes.append(p) - -# return processes - - diff --git a/energon/engine/gpt_pipeline_wrapper.py b/energon/engine/gpt_pipeline_wrapper.py index cbe16b2..13610fe 100644 --- a/energon/engine/gpt_pipeline_wrapper.py +++ b/energon/engine/gpt_pipeline_wrapper.py @@ -124,9 +124,9 @@ def pipeline_run(self, inputs): # print(self.pipe_meta.get_meta_tensor_shape()) # print(self.pipe_meta.get_meta_tensor) self.pipe_meta.store_meta(recv_forward(self.pipe_meta.get_meta_tensor_shape(), dtype=torch.int)) - # print(self.pipe_meta.get_tensor_shapes()) + print(self.pipe_meta.get_tensor_shapes()) input_tensor = recv_forward(self.pipe_meta.get_tensor_shapes(), dtype=self.dtype) - # print(input_tensor.shape) + print(input_tensor.shape) self.comm_input[self.comm_name] = input_tensor output = self.model(**self.comm_input, **self.static_input) return output diff --git a/energon/engine/server.py b/energon/engine/server.py index a53dba8..184d956 100644 --- a/energon/engine/server.py +++ b/energon/engine/server.py @@ -21,10 +21,10 @@ def init(tp_size: int, pp_size: int, backend: str, seed: int, verbose: bool, ran launch_from_multiprocess(tp_size, pp_size, backend, seed, verbose, rank, local_rank, world_size, host, port) WORKER_NAME = "wok{}" rpc.init_rpc(WORKER_NAME.format(rank), rank=rank, world_size=world_size) - + rpc.shutdown() return {"Start!"} -@app.get("/stop") -def shutDown(): - rpc.shutdown() - return {"Stop!"} \ No newline at end of file +# @app.get("/stop") +# def shutDown(): +# rpc.shutdown() +# return {"Stop!"} \ No newline at end of file diff --git a/examples/evaluate.py b/examples/evaluate.py index bd1476b..32fc7d2 100644 --- a/examples/evaluate.py +++ b/examples/evaluate.py @@ -1,8 +1,11 @@ import time import torch import argparse +import requests +import threading +from concurrent import futures # from model -from pipeline_gpt1d import GPT2_small_pipeline_1D, GPT2_exlarge_pipeline_1D, GPT3_pipeline_1D +from gpt.pipeline_gpt1d import GPT2_small_pipeline_1D, GPT2_exlarge_pipeline_1D, GPT3_pipeline_1D from energon.engine import InferenceEngine from energon.logging import get_dist_logger from energon.core import global_context as gpc @@ -40,13 +43,29 @@ def main(): hidden_states = None sample = dict(hidden_states=hidden_states, input_ids=input_ids, attention_mask=attention_mask) - # print(MODEL_CLASSES[args.model_name]) + engine = InferenceEngine(MODEL_CLASSES[args.model_name], model_config, max_batch_size = 32, tp_init_size = args.tensor_para_size, pp_init_size = args.pipe_para_size, port = 29501, dtype = torch.half) + + + for i in range(10): + output = engine.run(sample) + print(output.to_here()) + + + + # time.sleep(5) + + # urls = ['http://127.0.0.1:8005/stop', + # 'http://127.0.0.1:8006/stop', + # 'http://127.0.0.1:8007/stop'] + + # with futures.ThreadPoolExecutor(max_workers=3) as executor: + # for url in urls: + # future = executor.submit(requests.get, url) + - engine = InferenceEngine(MODEL_CLASSES[args.model_name], model_config, max_batch_size = 32, tp_init_size = args.tensor_para_size, pp_init_size = args.pipe_para_size, dtype = torch.half) + engine.clear() - output = engine.run(sample) - print(output.to_here()) # from model.pipeline_gpt1d import GPT2_small_pipeline_1D, GPT2_exlarge_pipeline_1D, GPT3_pipeline_1D # prof = torch.profiler.profile( diff --git a/examples/run_gpt_without_trition.sh b/examples/run_gpt_without_trition.sh new file mode 100755 index 0000000..043a509 --- /dev/null +++ b/examples/run_gpt_without_trition.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +tp_size=2 +pp_size=2 +model=gpt2_exlarge +world_size=`expr $tp_size \* $pp_size` +server_port_start=8005 +host="localhost" +port=29501 + +export PYTHONPATH=/home/lcdjs/ColossalAI-Inference/examples + +for ((i=1; i<${world_size}; i++)) +do +server_port=`expr $server_port_start + $i` +uvicorn server:app --app-dir /home/lcdjs/ColossalAI-Inference/energon/engine/ --port ${server_port} & +echo "process: ${i} launches" +done + +sleep 3 + +for ((i=1; i<${world_size}; i++)) +do +server_port=`expr $server_port_start + $i` +curl -X 'GET' \ +"http://127.0.0.1:${server_port}/start/${tp_size}?pp_size=${pp_size}&backend=nccl&seed=1024&verbose=true&rank=${i}&local_rank=${i}&host=${host}&port=${port}" \ +-H 'accept: application/json' & +echo "http://127.0.0.1:${server_port}/start/${tp_size}?pp_size=${pp_size}&backend=nccl&seed=1024&verbose=true&rank=${i}&local_rank=${i}&host=${host}&port=${port}" +echo "evoke process: ${i} init rpc" +done + +python3 evaluate.py --fp16 --model_name=${model} --tensor_para_size=${tp_size} --pipe_para_size=${pp_size} +# tritonserver --model-repository /opt/tritonserver/host/python_backend/models +