Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

General handler for open source data preprocessing #1302

Merged
merged 7 commits into from
Oct 12, 2022
Merged
177 changes: 150 additions & 27 deletions qlib/contrib/data/highfreq_handler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from qlib.data.dataset.handler import DataHandler, DataHandlerLP

from .handler import check_transform_proc

EPSILON = 1e-4


Expand All @@ -15,20 +17,9 @@ def __init__(
fit_end_time=None,
drop_raw=True,
):
def check_transform_proc(proc_l):
new_l = []
for p in proc_l:
p["kwargs"].update(
{
"fit_start_time": fit_start_time,
"fit_end_time": fit_end_time,
}
)
new_l.append(p)
return new_l

infer_processors = check_transform_proc(infer_processors)
learn_processors = check_transform_proc(learn_processors)
infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time)
learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time)

data_loader = {
"class": "QlibDataLoader",
Expand Down Expand Up @@ -110,6 +101,103 @@ def get_normalized_price_feature(price_field, shift=0):
return fields, names


class HighFreqGeneralHandler(DataHandlerLP):
def __init__(
self,
instruments="csi300",
start_time=None,
end_time=None,
infer_processors=[],
learn_processors=[],
fit_start_time=None,
fit_end_time=None,
drop_raw=True,
day_length=240,
):
self.day_length = day_length

infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time)
learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time)

data_loader = {
"class": "QlibDataLoader",
"kwargs": {
"config": self.get_feature_config(),
"swap_level": False,
"freq": "1min",
},
}
super().__init__(
instruments=instruments,
start_time=start_time,
end_time=end_time,
data_loader=data_loader,
infer_processors=infer_processors,
learn_processors=learn_processors,
drop_raw=drop_raw,
)

def get_feature_config(self):
fields = []
names = []

template_if = "If(IsNull({1}), {0}, {1})"
template_paused = f"Cut({{0}}, {self.day_length * 2}, None)"

def get_normalized_price_feature(price_field, shift=0):
# norm with the close price of 237th minute of yesterday.
if shift == 0:
template_norm = f"{{0}}/DayLast(Ref({{1}}, {self.day_length * 2}))"
else:
template_norm = f"Ref({{0}}, " + str(shift) + f")/DayLast(Ref({{1}}, {self.day_length}))"

template_fillnan = "FFillNan({0})"
# calculate -> ffill -> remove paused
feature_ops = template_paused.format(
template_fillnan.format(
template_norm.format(template_if.format("$close", price_field), template_fillnan.format("$close"))
)
)
return feature_ops

fields += [get_normalized_price_feature("$open", 0)]
fields += [get_normalized_price_feature("$high", 0)]
fields += [get_normalized_price_feature("$low", 0)]
fields += [get_normalized_price_feature("$close", 0)]
fields += [get_normalized_price_feature("$vwap", 0)]
names += ["$open", "$high", "$low", "$close", "$vwap"]

fields += [get_normalized_price_feature("$open", self.day_length)]
fields += [get_normalized_price_feature("$high", self.day_length)]
fields += [get_normalized_price_feature("$low", self.day_length)]
fields += [get_normalized_price_feature("$close", self.day_length)]
fields += [get_normalized_price_feature("$vwap", self.day_length)]
names += ["$open_1", "$high_1", "$low_1", "$close_1", "$vwap_1"]

# calculate and fill nan with 0
fields += [
template_paused.format(
"If(IsNull({0}), 0, {0})".format(
f"{{0}}/Ref(DayLast(Mean({{0}}, {self.day_length * 30})), {self.day_length})".format("$volume")
)
)
]
names += ["$volume"]

fields += [
template_paused.format(
"If(IsNull({0}), 0, {0})".format(
f"Ref({{0}}, {self.day_length})/Ref(DayLast(Mean({{0}}, {self.day_length * 30})), {self.day_length})".format(
"$volume"
)
)
)
]
names += ["$volume_1"]

return fields, names


class HighFreqBacktestHandler(DataHandler):
def __init__(
self,
Expand Down Expand Up @@ -163,6 +251,53 @@ def get_feature_config(self):
return fields, names


class HighFreqGeneralBacktestHandler(DataHandler):
def __init__(
self,
instruments="csi300",
start_time=None,
end_time=None,
day_length=240,
):
self.day_length = day_length
data_loader = {
"class": "QlibDataLoader",
"kwargs": {
"config": self.get_feature_config(),
"swap_level": False,
"freq": "1min",
},
}
super().__init__(
instruments=instruments,
start_time=start_time,
end_time=end_time,
data_loader=data_loader,
)

def get_feature_config(self):
fields = []
names = []

template_paused = f"Cut({{0}}, {self.day_length * 2}, None)"
template_fillnan = "FFillNan({0})"
template_if = "If(IsNull({1}), {0}, {1})"
fields += [
template_paused.format(template_fillnan.format("$close")),
]
names += ["$close0"]

fields += [
template_paused.format(template_if.format(template_fillnan.format("$close"), "$vwap")),
]
names += ["$vwap0"]

fields += [template_paused.format("If(IsNull({0}), 0, {0})".format("$volume"))]
names += ["$volume0"]

return fields, names


class HighFreqOrderHandler(DataHandlerLP):
def __init__(
self,
Expand All @@ -175,20 +310,9 @@ def __init__(
fit_end_time=None,
drop_raw=True,
):
def check_transform_proc(proc_l):
new_l = []
for p in proc_l:
p["kwargs"].update(
{
"fit_start_time": fit_start_time,
"fit_end_time": fit_end_time,
}
)
new_l.append(p)
return new_l

infer_processors = check_transform_proc(infer_processors)
learn_processors = check_transform_proc(learn_processors)
infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time)
learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time)

data_loader = {
"class": "QlibDataLoader",
Expand Down Expand Up @@ -356,7 +480,6 @@ def get_feature_config(self):

template_if = "If(IsNull({1}), {0}, {1})"
template_paused = "Select(Gt($hx_paused_num, 1.001), {0})"
# template_paused = "{0}"
template_fillnan = "FFillNan({0})"
fields += [
template_fillnan.format(template_paused.format("$close")),
Expand Down