Skip to content

Commit

Permalink
Merge pull request #895 from ricequant/RQSDK-749
Browse files Browse the repository at this point in the history
更新bundle数据支持将错误统一输出
  • Loading branch information
Cuizi7 authored Aug 19, 2024
2 parents 762bc15 + 8c85790 commit e5a9355
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions rqalpha/data/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from rqalpha.utils.datetime_func import convert_date_to_date_int, convert_date_to_int
from rqalpha.utils.i18n import gettext as _
from rqalpha.utils.functools import lru_cache
from rqalpha.utils.logger import init_logger, system_log
from rqalpha.environment import Environment
from rqalpha.model.instrument import Instrument

Expand Down Expand Up @@ -315,7 +316,7 @@ def __call__(self, path, fields, **kwargs):
while True:
order_book_ids = self._order_book_ids[i:i + step]
df = rqdatac.get_price(order_book_ids, START_DATE, datetime.date.today(), '1d',
adjust_type='none', fields=fields, expect_df=True)
adjust_type='none', fields=fields, expect_df=True)
if not (df is None or df.empty):
df.reset_index(inplace=True)
df['datetime'] = [convert_date_to_int(d) for d in df['date']]
Expand Down Expand Up @@ -356,9 +357,10 @@ def __call__(self, path, fields, **kwargs):
try:
h5 = h5py.File(path, 'a')
except OSError:
raise OSError("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
try:
system_log.error("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
yield 1
else:
is_futures = "futures" == os.path.basename(path).split(".")[0]
for order_book_id in self._order_book_ids:
# 特殊处理前复权合约,需要全量更新
Expand All @@ -367,8 +369,10 @@ def __call__(self, path, fields, **kwargs):
try:
last_date = int(h5[order_book_id]['datetime'][-1] // 1000000)
except OSError:
raise OSError("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
system_log.error("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
yield 1
break
except ValueError:
h5.pop(order_book_id)
start_date = START_DATE
Expand Down Expand Up @@ -406,12 +410,18 @@ def init_rqdatac_with_warnings_catch():
rqdatac.init()


def process_init_func():
init_rqdatac_with_warnings_catch()
init_logger()


def update_bundle(path, create, enable_compression=False, concurrency=1):
if create:
_DayBarTask = GenerateDayBarTask
else:
_DayBarTask = UpdateDayBarTask

init_logger()
kwargs = {}
if enable_compression:
kwargs['compression'] = 9
Expand All @@ -431,7 +441,7 @@ def update_bundle(path, create, enable_compression=False, concurrency=1):
)

with ProgressedProcessPoolExecutor(
max_workers=concurrency, initializer=init_rqdatac_with_warnings_catch
max_workers=concurrency, initializer=process_init_func
) as executor:
# windows上子进程需要执行rqdatac.init, 其他os则需要执行rqdatac.reset; rqdatac.init包含了rqdatac.reset的功能
for func in gen_file_funcs:
Expand Down
Binary file modified tests/outs/test_f_mean_reverting.pkl
Binary file not shown.

0 comments on commit e5a9355

Please sign in to comment.