Skip to content

Commit

Permalink
perf(server): reuse the preprocessing pool (#670)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanxiao committed Mar 31, 2022
1 parent f0dfc34 commit 962a1b5
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 30 deletions.
22 changes: 15 additions & 7 deletions docs/user-guides/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,25 +162,33 @@ executors:

### CLIP model config

For PyTorch backend, you can set the following parameters via `with`:
For PyTorch & ONNX backend, you can set the following parameters via `with`:

| Parameter | Description |
|-----------|--------------------------------------------------------------------------------------------------------------------------------|
| `name` | Model weights, default is `ViT-B/32`. Support all OpenAI released pretrained models. |
| `num_worker_preprocess` | The number of CPU workers for image & text prerpocessing, default 4. |
| `minibatch_size` | The size of a minibatch for CPU preprocessing and GPU encoding, default 64. Reduce the size of it if you encounter OOM on GPU. |
| `pool_backend` | The backend of the preprocessing worker pool, default is `thread` |

There are also runtime-specific parameters listed below:

````{tab} PyTorch
| Parameter | Description |
|-----------|--------------------------------------------------------------------------------------------------------------------------------|
| `device` | `cuda` or `cpu`. Default is `None` means auto-detect. |
| `jit` | If to enable Torchscript JIT, default is `False`. |
| `num_worker_preprocess` | The number of CPU workers for image & text prerpocessing, default 4. |
| `minibatch_size` | The size of a minibatch for CPU preprocessing and GPU encoding, default 64. Reduce the size of it if you encounter OOM on GPU. |
````

For ONNX backend, you can set the following parameters:
````{tab} ONNX
| Parameter | Description |
|-----------|---------------------------------------------------------------------------------------------------|
| `name` | Model name, default is `ViT-B/32`. |
| `providers` | [ONNX runtime provides](https://onnxruntime.ai/docs/execution-providers/), default is auto-detect |
| `num_worker_preprocess` | The number of CPU workers for image & text prerpocessing, default 4. |
| `minibatch_size` | The size of a minibatch for CPU preprocessing and GPU encoding, default 64. Reduce the size of it if you encounter OOM on GPU. |
````

For example, to turn on JIT and force PyTorch running on CPU, one can do:

Expand Down
24 changes: 10 additions & 14 deletions server/clip_server/executors/clip_onnx.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
import io
import os
from typing import TYPE_CHECKING, List, Sequence, Tuple
from multiprocessing.pool import ThreadPool, Pool
from typing import List, Sequence, Tuple

import torch
from PIL import Image
from jina import Executor, requests
from jina import Executor, requests, DocumentArray

from clip_server.model import clip
from clip_server.model.clip_onnx import CLIPOnnxModel

if TYPE_CHECKING:
from docarray import DocumentArray, Document

_SIZE = {
'RN50': 224,
'RN101': 224,
Expand All @@ -35,12 +31,16 @@ def __init__(
),
num_worker_preprocess: int = 4,
minibatch_size: int = 64,
pool_backend: str = 'thread',
**kwargs
):
super().__init__(**kwargs)
self._preprocess = clip._transform(_SIZE[name])
self._model = CLIPOnnxModel(name)
self._num_worker_preprocess = num_worker_preprocess
if pool_backend == 'thread':
self._pool = ThreadPool(processes=num_worker_preprocess)
else:
self._pool = Pool(processes=num_worker_preprocess)
self._minibatch_size = minibatch_size
self._model.start_sessions(providers=providers)

Expand All @@ -64,18 +64,14 @@ async def encode(self, docs: 'DocumentArray', **kwargs):
# for image
if _img_da:
for minibatch in _img_da.map_batch(
self._preproc_image,
batch_size=self._minibatch_size,
num_worker=self._num_worker_preprocess,
self._preproc_image, batch_size=self._minibatch_size, pool=self._pool
):
minibatch.embeddings = self._model.encode_image(minibatch.tensors)

# for text
if _txt_da:
for minibatch, _texts in _txt_da.map_batch(
self._preproc_text,
batch_size=self._minibatch_size,
num_worker=self._num_worker_preprocess,
self._preproc_text, batch_size=self._minibatch_size, pool=self._pool
):
minibatch.embeddings = self._model.encode_text(minibatch.tensors)
minibatch.texts = _texts
Expand Down
19 changes: 11 additions & 8 deletions server/clip_server/executors/clip_torch.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import io
from typing import TYPE_CHECKING, Optional, List, Tuple
from multiprocessing.pool import ThreadPool, Pool
from typing import Optional, List, Tuple

import torch
from PIL import Image
from clip_server.model import clip
from jina import Executor, requests
from jina import Executor, requests, DocumentArray

if TYPE_CHECKING:
from docarray import DocumentArray
from clip_server.model import clip


class CLIPEncoder(Executor):
Expand All @@ -18,16 +17,20 @@ def __init__(
jit: bool = False,
num_worker_preprocess: int = 4,
minibatch_size: int = 64,
pool_backend: str = 'thread',
**kwargs
):
super().__init__(**kwargs)
if not device:
self._device = 'cuda' if torch.cuda.is_available() else 'cpu'
else:
self._device = device
self._num_worker_preprocess = num_worker_preprocess
self._minibatch_size = minibatch_size
self._model, self._preprocess = clip.load(name, device=self._device, jit=jit)
if pool_backend == 'thread':
self._pool = ThreadPool(processes=num_worker_preprocess)
else:
self._pool = Pool(processes=num_worker_preprocess)

def _preproc_image(self, da: 'DocumentArray') -> 'DocumentArray':
for d in da:
Expand All @@ -52,7 +55,7 @@ async def encode(self, docs: 'DocumentArray', **kwargs):
for minibatch in _img_da.map_batch(
self._preproc_image,
batch_size=self._minibatch_size,
num_worker=self._num_worker_preprocess,
pool=self._pool,
):
minibatch.embeddings = (
self._model.encode_image(minibatch.tensors).cpu().numpy()
Expand All @@ -63,7 +66,7 @@ async def encode(self, docs: 'DocumentArray', **kwargs):
for minibatch, _texts in _txt_da.map_batch(
self._preproc_text,
batch_size=self._minibatch_size,
num_worker=self._num_worker_preprocess,
pool=self._pool,
):
minibatch.embeddings = (
self._model.encode_text(minibatch.tensors).cpu().numpy()
Expand Down
9 changes: 8 additions & 1 deletion server/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,14 @@
long_description_content_type='text/markdown',
zip_safe=False,
setup_requires=['setuptools>=18.0', 'wheel'],
install_requires=['ftfy', 'torch', 'regex', 'torchvision', 'jina>=3.2.10'],
install_requires=[
'ftfy',
'torch',
'regex',
'torchvision',
'jina>=3.2.10',
'docarray>=0.11.0',
],
extras_require={
'onnx': ['onnxruntime', 'onnx', 'onnxruntime-gpu'],
},
Expand Down

0 comments on commit 962a1b5

Please sign in to comment.