diff --git a/docs/user-guides/server.md b/docs/user-guides/server.md index 36d67b7cb..89a210104 100644 --- a/docs/user-guides/server.md +++ b/docs/user-guides/server.md @@ -162,25 +162,33 @@ executors: ### CLIP model config -For PyTorch backend, you can set the following parameters via `with`: +For PyTorch & ONNX backend, you can set the following parameters via `with`: | Parameter | Description | |-----------|--------------------------------------------------------------------------------------------------------------------------------| | `name` | Model weights, default is `ViT-B/32`. Support all OpenAI released pretrained models. | +| `num_worker_preprocess` | The number of CPU workers for image & text prerpocessing, default 4. | +| `minibatch_size` | The size of a minibatch for CPU preprocessing and GPU encoding, default 64. Reduce the size of it if you encounter OOM on GPU. | +| `pool_backend` | The backend of the preprocessing worker pool, default is `thread` | + +There are also runtime-specific parameters listed below: + +````{tab} PyTorch + +| Parameter | Description | +|-----------|--------------------------------------------------------------------------------------------------------------------------------| | `device` | `cuda` or `cpu`. Default is `None` means auto-detect. | | `jit` | If to enable Torchscript JIT, default is `False`. | -| `num_worker_preprocess` | The number of CPU workers for image & text prerpocessing, default 4. | -| `minibatch_size` | The size of a minibatch for CPU preprocessing and GPU encoding, default 64. Reduce the size of it if you encounter OOM on GPU. | +```` -For ONNX backend, you can set the following parameters: +````{tab} ONNX | Parameter | Description | |-----------|---------------------------------------------------------------------------------------------------| -| `name` | Model name, default is `ViT-B/32`. | | `providers` | [ONNX runtime provides](https://onnxruntime.ai/docs/execution-providers/), default is auto-detect | -| `num_worker_preprocess` | The number of CPU workers for image & text prerpocessing, default 4. | -| `minibatch_size` | The size of a minibatch for CPU preprocessing and GPU encoding, default 64. Reduce the size of it if you encounter OOM on GPU. | + +```` For example, to turn on JIT and force PyTorch running on CPU, one can do: diff --git a/server/clip_server/executors/clip_onnx.py b/server/clip_server/executors/clip_onnx.py index fab178f42..091b7ec0e 100644 --- a/server/clip_server/executors/clip_onnx.py +++ b/server/clip_server/executors/clip_onnx.py @@ -1,17 +1,13 @@ import io -import os -from typing import TYPE_CHECKING, List, Sequence, Tuple +from multiprocessing.pool import ThreadPool, Pool +from typing import List, Sequence, Tuple -import torch from PIL import Image -from jina import Executor, requests +from jina import Executor, requests, DocumentArray from clip_server.model import clip from clip_server.model.clip_onnx import CLIPOnnxModel -if TYPE_CHECKING: - from docarray import DocumentArray, Document - _SIZE = { 'RN50': 224, 'RN101': 224, @@ -35,12 +31,16 @@ def __init__( ), num_worker_preprocess: int = 4, minibatch_size: int = 64, + pool_backend: str = 'thread', **kwargs ): super().__init__(**kwargs) self._preprocess = clip._transform(_SIZE[name]) self._model = CLIPOnnxModel(name) - self._num_worker_preprocess = num_worker_preprocess + if pool_backend == 'thread': + self._pool = ThreadPool(processes=num_worker_preprocess) + else: + self._pool = Pool(processes=num_worker_preprocess) self._minibatch_size = minibatch_size self._model.start_sessions(providers=providers) @@ -64,18 +64,14 @@ async def encode(self, docs: 'DocumentArray', **kwargs): # for image if _img_da: for minibatch in _img_da.map_batch( - self._preproc_image, - batch_size=self._minibatch_size, - num_worker=self._num_worker_preprocess, + self._preproc_image, batch_size=self._minibatch_size, pool=self._pool ): minibatch.embeddings = self._model.encode_image(minibatch.tensors) # for text if _txt_da: for minibatch, _texts in _txt_da.map_batch( - self._preproc_text, - batch_size=self._minibatch_size, - num_worker=self._num_worker_preprocess, + self._preproc_text, batch_size=self._minibatch_size, pool=self._pool ): minibatch.embeddings = self._model.encode_text(minibatch.tensors) minibatch.texts = _texts diff --git a/server/clip_server/executors/clip_torch.py b/server/clip_server/executors/clip_torch.py index 33e6168bf..744eeec71 100644 --- a/server/clip_server/executors/clip_torch.py +++ b/server/clip_server/executors/clip_torch.py @@ -1,13 +1,12 @@ import io -from typing import TYPE_CHECKING, Optional, List, Tuple +from multiprocessing.pool import ThreadPool, Pool +from typing import Optional, List, Tuple import torch from PIL import Image -from clip_server.model import clip -from jina import Executor, requests +from jina import Executor, requests, DocumentArray -if TYPE_CHECKING: - from docarray import DocumentArray +from clip_server.model import clip class CLIPEncoder(Executor): @@ -18,6 +17,7 @@ def __init__( jit: bool = False, num_worker_preprocess: int = 4, minibatch_size: int = 64, + pool_backend: str = 'thread', **kwargs ): super().__init__(**kwargs) @@ -25,9 +25,12 @@ def __init__( self._device = 'cuda' if torch.cuda.is_available() else 'cpu' else: self._device = device - self._num_worker_preprocess = num_worker_preprocess self._minibatch_size = minibatch_size self._model, self._preprocess = clip.load(name, device=self._device, jit=jit) + if pool_backend == 'thread': + self._pool = ThreadPool(processes=num_worker_preprocess) + else: + self._pool = Pool(processes=num_worker_preprocess) def _preproc_image(self, da: 'DocumentArray') -> 'DocumentArray': for d in da: @@ -52,7 +55,7 @@ async def encode(self, docs: 'DocumentArray', **kwargs): for minibatch in _img_da.map_batch( self._preproc_image, batch_size=self._minibatch_size, - num_worker=self._num_worker_preprocess, + pool=self._pool, ): minibatch.embeddings = ( self._model.encode_image(minibatch.tensors).cpu().numpy() @@ -63,7 +66,7 @@ async def encode(self, docs: 'DocumentArray', **kwargs): for minibatch, _texts in _txt_da.map_batch( self._preproc_text, batch_size=self._minibatch_size, - num_worker=self._num_worker_preprocess, + pool=self._pool, ): minibatch.embeddings = ( self._model.encode_text(minibatch.tensors).cpu().numpy() diff --git a/server/setup.py b/server/setup.py index 65835e48e..835a51509 100644 --- a/server/setup.py +++ b/server/setup.py @@ -41,7 +41,14 @@ long_description_content_type='text/markdown', zip_safe=False, setup_requires=['setuptools>=18.0', 'wheel'], - install_requires=['ftfy', 'torch', 'regex', 'torchvision', 'jina>=3.2.10'], + install_requires=[ + 'ftfy', + 'torch', + 'regex', + 'torchvision', + 'jina>=3.2.10', + 'docarray>=0.11.0', + ], extras_require={ 'onnx': ['onnxruntime', 'onnx', 'onnxruntime-gpu'], },