Skip to content
This repository has been archived by the owner on Oct 16, 2023. It is now read-only.

Commit

Permalink
make rpc shutdown correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
dujiangsu committed Mar 23, 2022
1 parent 439b8ed commit b25e20f
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 88 deletions.
79 changes: 3 additions & 76 deletions energon/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
from torch.nn import Module
import torch.multiprocessing as mp
from functools import partial
import requests

# pytorch rpc
import torch.distributed.rpc as rpc
from .rpc_utils import remote_cls_method, sync_cls_method, async_cls_method
from .rpc_worker import RPCWorker

# depend on colossalai
from energon.core import global_context as gpc
from energon.context import ParallelMode
from energon.initialize import launch_from_torch, launch_from_multiprocess
Expand All @@ -18,35 +19,6 @@
from energon.logging import get_dist_logger
from energon.nn import PipelineCommWrapper

# from httpx import AsyncClient

# async def arouseRPC(servehost: str,
# serveport: int,
# tp_size: int,
# pp_size: int,
# backend: str,
# seed: int,
# verbose: bool,
# rank: int,
# local_rank: int,
# host: str,
# port: int):
# url = f'http://{servehost}:{serveport}/start/{tp_size}?pp_size={pp_size}&backend={backend}&seed={seed}&verbose={verbose}&rank={rank}&local_rank={local_rank}&host={host}&port={port}'
# print(url)
# async with AsyncClient(app = ap)

# dd = httpx.get(url)
# print(f'{dd.status_code}')

# def shutdownRPC(servehost: str,
# serveport: int):
# url = f'http://{servehost}:{serveport}/stop'
# print(url)
# dd = httpx.get(url)
# print(f'{dd.status_code}')




class InferenceEngine(Module):
def __init__(self,
Expand Down Expand Up @@ -79,7 +51,6 @@ def __init__(self,
self.global_world_size = pp_init_size * tp_init_size
self.host = host
self.port = port
self.processes = None
self.tp_size = tp_init_size
self.pp_size = pp_init_size

Expand All @@ -94,10 +65,6 @@ def _init_dist_rpc(self):
r'''
Based on global_context, init the rpc connection.
'''
# self.processes = launch_rpc(tp_size = self.tp_size, pp_size = self.pp_size, backend = 'nccl', seed = 1024, verbose = True, host = self.host, port = self.port)
# arouseRPC(servehost = self.host, serveport = 8005, tp_size = self.tp_size, pp_size = self.pp_size, backend = 'nccl', seed = 1024, verbose = True,
# rank = 1, local_rank = 1, host = self.host, port = self.port)

os.environ['MASTER_ADDR'] = self.host
os.environ['MASTER_PORT'] = f'{self.port}'
launch_from_multiprocess(tp_size = self.tp_size, pp_size = self.pp_size, rank = self.rank, local_rank = self.rank, world_size = self.global_world_size, host = self.host, port = self.port)
Expand All @@ -121,48 +88,8 @@ def run(self, inputs):


def clear(self):

rpc.shutdown()

for p in self.processes:
p.join()


# def process_func(tp_size: int = 1,
# pp_size:int = 1,
# backend: str = 'nccl',
# seed: int = 1024,
# verbose: bool = True,
# rank: int = 0,
# local_rank: int = 0,
# world_size:int = 1,
# host: str = 'localhost',
# port: int = 29500):

# os.environ['MASTER_ADDR'] = host
# os.environ['MASTER_PORT'] = f'{port}'

# launch_from_multiprocess(tp_size, pp_size, backend, seed, verbose, rank, local_rank, world_size, host, port)
# WORKER_NAME = "wok{}"
# rpc.init_rpc(WORKER_NAME.format(rank), rank=rank, world_size=world_size)
# rpc.shutdown()

# def launch_rpc(tp_size: int = 1,
# pp_size:int = 1,
# backend: str = 'nccl',
# seed: int = 1024,
# verbose: bool = True,
# host: str = 'localhost',
# port: int = 29500):

# world_size = pp_size * tp_size

# processes = []
# mp.set_start_method('spawn')
# for rank in range(world_size-1):
# p = mp.Process(target=process_func, args=(tp_size, pp_size, backend, seed, verbose, rank+1, rank+1, world_size, host, port))
# p.start()
# processes.append(p)

# return processes


4 changes: 2 additions & 2 deletions energon/engine/gpt_pipeline_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ def pipeline_run(self, inputs):
# print(self.pipe_meta.get_meta_tensor_shape())
# print(self.pipe_meta.get_meta_tensor)
self.pipe_meta.store_meta(recv_forward(self.pipe_meta.get_meta_tensor_shape(), dtype=torch.int))
# print(self.pipe_meta.get_tensor_shapes())
print(self.pipe_meta.get_tensor_shapes())
input_tensor = recv_forward(self.pipe_meta.get_tensor_shapes(), dtype=self.dtype)
# print(input_tensor.shape)
print(input_tensor.shape)
self.comm_input[self.comm_name] = input_tensor
output = self.model(**self.comm_input, **self.static_input)
return output
Expand Down
10 changes: 5 additions & 5 deletions energon/engine/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ def init(tp_size: int, pp_size: int, backend: str, seed: int, verbose: bool, ran
launch_from_multiprocess(tp_size, pp_size, backend, seed, verbose, rank, local_rank, world_size, host, port)
WORKER_NAME = "wok{}"
rpc.init_rpc(WORKER_NAME.format(rank), rank=rank, world_size=world_size)

rpc.shutdown()
return {"Start!"}

@app.get("/stop")
def shutDown():
rpc.shutdown()
return {"Stop!"}
# @app.get("/stop")
# def shutDown():
# rpc.shutdown()
# return {"Stop!"}
29 changes: 24 additions & 5 deletions examples/evaluate.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import time
import torch
import argparse
import requests
import threading
from concurrent import futures
# from model
from pipeline_gpt1d import GPT2_small_pipeline_1D, GPT2_exlarge_pipeline_1D, GPT3_pipeline_1D
from gpt.pipeline_gpt1d import GPT2_small_pipeline_1D, GPT2_exlarge_pipeline_1D, GPT3_pipeline_1D
from energon.engine import InferenceEngine
from energon.logging import get_dist_logger
from energon.core import global_context as gpc
Expand Down Expand Up @@ -40,13 +43,29 @@ def main():
hidden_states = None
sample = dict(hidden_states=hidden_states, input_ids=input_ids, attention_mask=attention_mask)

# print(MODEL_CLASSES[args.model_name])
engine = InferenceEngine(MODEL_CLASSES[args.model_name], model_config, max_batch_size = 32, tp_init_size = args.tensor_para_size, pp_init_size = args.pipe_para_size, port = 29501, dtype = torch.half)


for i in range(10):
output = engine.run(sample)
print(output.to_here())



# time.sleep(5)

# urls = ['http://127.0.0.1:8005/stop',
# 'http://127.0.0.1:8006/stop',
# 'http://127.0.0.1:8007/stop']

# with futures.ThreadPoolExecutor(max_workers=3) as executor:
# for url in urls:
# future = executor.submit(requests.get, url)


engine = InferenceEngine(MODEL_CLASSES[args.model_name], model_config, max_batch_size = 32, tp_init_size = args.tensor_para_size, pp_init_size = args.pipe_para_size, dtype = torch.half)
engine.clear()

output = engine.run(sample)

print(output.to_here())

# from model.pipeline_gpt1d import GPT2_small_pipeline_1D, GPT2_exlarge_pipeline_1D, GPT3_pipeline_1D
# prof = torch.profiler.profile(
Expand Down
34 changes: 34 additions & 0 deletions examples/run_gpt_without_trition.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/bin/bash

tp_size=2
pp_size=2
model=gpt2_exlarge
world_size=`expr $tp_size \* $pp_size`
server_port_start=8005
host="localhost"
port=29501

export PYTHONPATH=/home/lcdjs/ColossalAI-Inference/examples

for ((i=1; i<${world_size}; i++))
do
server_port=`expr $server_port_start + $i`
uvicorn server:app --app-dir /home/lcdjs/ColossalAI-Inference/energon/engine/ --port ${server_port} &
echo "process: ${i} launches"
done

sleep 3

for ((i=1; i<${world_size}; i++))
do
server_port=`expr $server_port_start + $i`
curl -X 'GET' \
"http://127.0.0.1:${server_port}/start/${tp_size}?pp_size=${pp_size}&backend=nccl&seed=1024&verbose=true&rank=${i}&local_rank=${i}&host=${host}&port=${port}" \
-H 'accept: application/json' &
echo "http://127.0.0.1:${server_port}/start/${tp_size}?pp_size=${pp_size}&backend=nccl&seed=1024&verbose=true&rank=${i}&local_rank=${i}&host=${host}&port=${port}"
echo "evoke process: ${i} init rpc"
done

python3 evaluate.py --fp16 --model_name=${model} --tensor_para_size=${tp_size} --pipe_para_size=${pp_size}
# tritonserver --model-repository /opt/tritonserver/host/python_backend/models

0 comments on commit b25e20f

Please sign in to comment.