Skip to content

Commit

Permalink
add default protocol_version (microsoft#677)
Browse files Browse the repository at this point in the history
* add default protocol_version

* add comment to serial.Serializable.get_backend
  • Loading branch information
zhupr authored Nov 10, 2021
1 parent 09c5142 commit 9eadaf8
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 127 deletions.
5 changes: 5 additions & 0 deletions qlib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def set_conf_from_C(self, config_c):
REG_CN = "cn"
REG_US = "us"

# pickle.dump protocol version: https://docs.python.org/3/library/pickle.html#data-stream-format
PROTOCOL_VERSION = 4

NUM_USABLE_CPU = max(multiprocessing.cpu_count() - 2, 1)

DISK_DATASET_CACHE = "DiskDatasetCache"
Expand Down Expand Up @@ -107,6 +110,8 @@ def set_conf_from_C(self, config_c):
# for simple dataset cache
"local_cache_path": None,
"kernels": NUM_USABLE_CPU,
# pickle.dump protocol version
"dump_protocol_version": PROTOCOL_VERSION,
# How many tasks belong to one process. Recommend 1 for high-frequency data and None for daily data.
"maxtasksperchild": None,
# If joblib_backend is None, use loky
Expand Down
9 changes: 3 additions & 6 deletions qlib/contrib/online/manager.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import os
import pickle
import yaml
import pathlib
import pandas as pd
import shutil
from ..backtest.account import Account
from ..backtest.exchange import Exchange
from ...backtest.account import Account
from .user import User
from .utils import load_instance
from ...utils import save_instance, init_instance_by_config
from .utils import load_instance, save_instance
from ...utils import init_instance_by_config


class UserManager:
Expand Down
6 changes: 3 additions & 3 deletions qlib/contrib/online/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import yaml
import pandas as pd
from ...data import D
from ...config import C
from ...log import get_module_logger
from ...utils import get_module_by_module_path, init_instance_by_config
from ...utils import get_next_trading_date
from ..backtest.exchange import Exchange
from ...backtest.exchange import Exchange

log = get_module_logger("utils")

Expand Down Expand Up @@ -42,7 +42,7 @@ def save_instance(instance, file_path):
"""
file_path = pathlib.Path(file_path)
with file_path.open("wb") as fr:
pickle.dump(instance, fr)
pickle.dump(instance, fr, C.dump_protocol_version)


def create_user_folder(path):
Expand Down
5 changes: 3 additions & 2 deletions qlib/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,11 @@ def load(self, instrument, start_index, end_index, freq):
raise ValueError("Invalid index range: {} {}".format(start_index, end_index))
try:
series = self._load_internal(instrument, start_index, end_index, freq)
except Exception:
except Exception as e:
get_module_logger("data").error(
f"Loading data error: instrument={instrument}, expression={str(self)}, "
f"start_index={start_index}, end_index={end_index}, freq={freq}"
f"start_index={start_index}, end_index={end_index}, freq={freq}. "
f"error info: {str(e)}"
)
raise
series.name = str(self)
Expand Down
10 changes: 5 additions & 5 deletions qlib/data/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def visit(cache_path: Union[str, Path]):
d["meta"]["visits"] = d["meta"]["visits"] + 1
except KeyError:
raise KeyError("Unknown meta keyword")
pickle.dump(d, f)
pickle.dump(d, f, protocol=C.dump_protocol_version)
except Exception as e:
get_module_logger("CacheUtils").warning(f"visit {cache_path} cache error: {e}")

Expand Down Expand Up @@ -573,7 +573,7 @@ def gen_expression_cache(self, expression_data, cache_path, instrument, field, f
meta_path = cache_path.with_suffix(".meta")

with meta_path.open("wb") as f:
pickle.dump(meta, f)
pickle.dump(meta, f, protocol=C.dump_protocol_version)
meta_path.chmod(stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH)
df = expression_data.to_frame()

Expand Down Expand Up @@ -638,7 +638,7 @@ def update(self, sid, cache_uri, freq: str = "day"):
# update meta file
d["info"]["last_update"] = str(new_calendar[-1])
with meta_path.open("wb") as f:
pickle.dump(d, f)
pickle.dump(d, f, protocol=C.dump_protocol_version)
return 0


Expand Down Expand Up @@ -935,7 +935,7 @@ def gen_dataset_cache(self, cache_path: Union[str, Path], instruments, fields, f
"meta": {"last_visit": time.time(), "visits": 1},
}
with cache_path.with_suffix(".meta").open("wb") as f:
pickle.dump(meta, f)
pickle.dump(meta, f, protocol=C.dump_protocol_version)
cache_path.with_suffix(".meta").chmod(stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH)
# write index file
im = DiskDatasetCache.IndexManager(cache_path)
Expand Down Expand Up @@ -1057,7 +1057,7 @@ def update(self, cache_uri, freq: str = "day"):
# update meta file
d["info"]["last_update"] = str(new_calendar[-1])
with meta_path.open("wb") as f:
pickle.dump(d, f)
pickle.dump(d, f, protocol=C.dump_protocol_version)
return 0


Expand Down
205 changes: 103 additions & 102 deletions qlib/data/client.py
Original file line number Diff line number Diff line change
@@ -1,102 +1,103 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.


from __future__ import division
from __future__ import print_function

import socketio

import qlib
from ..log import get_module_logger
import pickle


class Client:
"""A client class
Provide the connection tool functions for ClientProvider.
"""

def __init__(self, host, port):
super(Client, self).__init__()
self.sio = socketio.Client()
self.server_host = host
self.server_port = port
self.logger = get_module_logger(self.__class__.__name__)
# bind connect/disconnect callbacks
self.sio.on(
"connect",
lambda: self.logger.debug("Connect to server {}".format(self.sio.connection_url)),
)
self.sio.on("disconnect", lambda: self.logger.debug("Disconnect from server!"))

def connect_server(self):
"""Connect to server."""
try:
self.sio.connect("ws://" + self.server_host + ":" + str(self.server_port))
except socketio.exceptions.ConnectionError:
self.logger.error("Cannot connect to server - check your network or server status")

def disconnect(self):
"""Disconnect from server."""
try:
self.sio.eio.disconnect(True)
except Exception as e:
self.logger.error("Cannot disconnect from server : %s" % e)

def send_request(self, request_type, request_content, msg_queue, msg_proc_func=None):
"""Send a certain request to server.
Parameters
----------
request_type : str
type of proposed request, 'calendar'/'instrument'/'feature'.
request_content : dict
records the information of the request.
msg_proc_func : func
the function to process the message when receiving response, should have arg `*args`.
msg_queue: Queue
The queue to pass the messsage after callback.
"""
head_info = {"version": qlib.__version__}

def request_callback(*args):
"""callback_wrapper
:param *args: args[0] is the response content
"""
# args[0] is the response content
self.logger.debug("receive data and enter queue")
msg = dict(args[0])
if msg["detailed_info"] is not None:
if msg["status"] != 0:
self.logger.error(msg["detailed_info"])
else:
self.logger.info(msg["detailed_info"])
if msg["status"] != 0:
ex = ValueError(f"Bad response(status=={msg['status']}), detailed info: {msg['detailed_info']}")
msg_queue.put(ex)
else:
if msg_proc_func is not None:
try:
ret = msg_proc_func(msg["result"])
except Exception as e:
self.logger.exception("Error when processing message.")
ret = e
else:
ret = msg["result"]
msg_queue.put(ret)
self.disconnect()
self.logger.debug("disconnected")

self.logger.debug("try connecting")
self.connect_server()
self.logger.debug("connected")
# The pickle is for passing some parameters with special type(such as
# pd.Timestamp)
request_content = {"head": head_info, "body": pickle.dumps(request_content)}
self.sio.on(request_type + "_response", request_callback)
self.logger.debug("try sending")
self.sio.emit(request_type + "_request", request_content)
self.sio.wait()
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.


from __future__ import division
from __future__ import print_function

import socketio

import qlib
from ..config import C
from ..log import get_module_logger
import pickle


class Client:
"""A client class
Provide the connection tool functions for ClientProvider.
"""

def __init__(self, host, port):
super(Client, self).__init__()
self.sio = socketio.Client()
self.server_host = host
self.server_port = port
self.logger = get_module_logger(self.__class__.__name__)
# bind connect/disconnect callbacks
self.sio.on(
"connect",
lambda: self.logger.debug("Connect to server {}".format(self.sio.connection_url)),
)
self.sio.on("disconnect", lambda: self.logger.debug("Disconnect from server!"))

def connect_server(self):
"""Connect to server."""
try:
self.sio.connect("ws://" + self.server_host + ":" + str(self.server_port))
except socketio.exceptions.ConnectionError:
self.logger.error("Cannot connect to server - check your network or server status")

def disconnect(self):
"""Disconnect from server."""
try:
self.sio.eio.disconnect(True)
except Exception as e:
self.logger.error("Cannot disconnect from server : %s" % e)

def send_request(self, request_type, request_content, msg_queue, msg_proc_func=None):
"""Send a certain request to server.
Parameters
----------
request_type : str
type of proposed request, 'calendar'/'instrument'/'feature'.
request_content : dict
records the information of the request.
msg_proc_func : func
the function to process the message when receiving response, should have arg `*args`.
msg_queue: Queue
The queue to pass the messsage after callback.
"""
head_info = {"version": qlib.__version__}

def request_callback(*args):
"""callback_wrapper
:param *args: args[0] is the response content
"""
# args[0] is the response content
self.logger.debug("receive data and enter queue")
msg = dict(args[0])
if msg["detailed_info"] is not None:
if msg["status"] != 0:
self.logger.error(msg["detailed_info"])
else:
self.logger.info(msg["detailed_info"])
if msg["status"] != 0:
ex = ValueError(f"Bad response(status=={msg['status']}), detailed info: {msg['detailed_info']}")
msg_queue.put(ex)
else:
if msg_proc_func is not None:
try:
ret = msg_proc_func(msg["result"])
except Exception as e:
self.logger.exception("Error when processing message.")
ret = e
else:
ret = msg["result"]
msg_queue.put(ret)
self.disconnect()
self.logger.debug("disconnected")

self.logger.debug("try connecting")
self.connect_server()
self.logger.debug("connected")
# The pickle is for passing some parameters with special type(such as
# pd.Timestamp)
request_content = {"head": head_info, "body": pickle.dumps(request_content, protocol=C.dump_protocol_version)}
self.sio.on(request_type + "_response", request_callback)
self.logger.debug("try sending")
self.sio.emit(request_type + "_request", request_content)
self.sio.wait()
5 changes: 3 additions & 2 deletions qlib/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,10 +726,11 @@ def expression(self, instrument, field, start_time=None, end_time=None, freq="da
lft_etd, rght_etd = expression.get_extended_window_size()
try:
series = expression.load(instrument, max(0, start_index - lft_etd), end_index + rght_etd, freq)
except Exception:
except Exception as e:
get_module_logger("data").error(
f"Loading expression error: "
f"instrument={instrument}, field=({field}), start_time={start_time}, end_time={end_time}, freq={freq}"
f"instrument={instrument}, field=({field}), start_time={start_time}, end_time={end_time}, freq={freq}. "
f"error info: {str(e)}"
)
raise
# Ensure that each column type is consistent
Expand Down
4 changes: 2 additions & 2 deletions qlib/data/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,12 @@ def _load_internal(self, instrument, start_index, end_index, freq):
warning_info = (
f"Loading {instrument}: {str(self)}; np.{self.func}(series_left, series_right), "
f"The length of series_left and series_right is different: ({len(series_left)}, {len(series_right)}), "
f"series_left is {str(self.feature_left)}, series_right is {str(self.feature_left)}. Please check the data"
f"series_left is {str(self.feature_left)}, series_right is {str(self.feature_right)}. Please check the data"
)
else:
warning_info = (
f"Loading {instrument}: {str(self)}; np.{self.func}(series_left, series_right), "
f"series_left is {str(self.feature_left)}, series_right is {str(self.feature_left)}. Please check the data"
f"series_left is {str(self.feature_left)}, series_right is {str(self.feature_right)}. Please check the data"
)
try:
res = getattr(np, self.func)(series_left, series_right)
Expand Down
2 changes: 1 addition & 1 deletion qlib/utils/objm.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def create_path(self) -> str:

def save_obj(self, obj, name):
with (self.path / name).open("wb") as f:
pickle.dump(obj, f)
pickle.dump(obj, f, protocol=C.dump_protocol_version)

def save_objs(self, obj_name_l):
for obj, name in obj_name_l:
Expand Down
7 changes: 5 additions & 2 deletions qlib/utils/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dill
from pathlib import Path
from typing import Union
from ..config import C


class Serializable:
Expand Down Expand Up @@ -85,7 +86,8 @@ def to_pickle(self, path: Union[Path, str], dump_all: bool = None, exclude: list
"""
self.config(dump_all=dump_all, exclude=exclude)
with Path(path).open("wb") as f:
self.get_backend().dump(self, f)
# pickle interface like backend; such as dill
self.get_backend().dump(self, f, protocol=C.dump_protocol_version)

@classmethod
def load(cls, filepath):
Expand Down Expand Up @@ -116,6 +118,7 @@ def get_backend(cls):
Returns:
module: pickle or dill module based on pickle_backend
"""
# NOTE: pickle interface like backend; such as dill
if cls.pickle_backend == "pickle":
return pickle
elif cls.pickle_backend == "dill":
Expand All @@ -140,4 +143,4 @@ def general_dump(obj, path: Union[Path, str]):
obj.to_pickle(path)
else:
with path.open("wb") as f:
pickle.dump(obj, f)
pickle.dump(obj, f, protocol=C.dump_protocol_version)
Loading

0 comments on commit 9eadaf8

Please sign in to comment.