前言

最近需要搭建本地的量化回测平台来满足个性化的回测需求,填补聚宽、真格等在线回测平台的缺陷。之前没有做过相关的工作,所以打算先学习一下vn.py的回测模块的框架,读一下vn.py的源码。vnpy的源码可以从github上获取

但vn.py源码的注释比较少,本蒟蒻读起来比较吃力,在参考了网络上相关的解析之后,形成自己的一套解析思路,即自顶向下解析,把学习的过程记录下来,与诸位大佬分析。

在我个人学习的过程中,主要参考了知乎张世玉的vnpy源码解析系列文章。张先生的文章介绍得更为系统和详细,我写的这篇文章主要是希望可以提纲掣领,帮助和我一样的新手快速了解vnpy回测模块的架构。

解析思路

本文是我自己在阅读源码的时候的解析思路的记录,采用自顶向下的思路来逐层剥丝抽茧。也就是先从页面分析,追踪到每一个按钮绑定的槽函数,接着分析槽函数,追踪槽函数中调用的方法,分析数据的流向,知道把整个系统贯穿起来。

主干部分

界面

启动vnpy后,弹出如下界面,点击左侧菜单栏中的回测功能,可打开回测模块。
在这里插入图片描述
回测模块的窗口如下,有一个开始回测按钮,我们这里就分析点击开始回测按钮,会触发什么样的一系列程序。
在这里插入图片描述

界面源码

回测窗口的界面源码就在vnpy\app\cta_backtester\ui\widget.py中
在这里插入图片描述
该文件中定义了一个BacktesterManager类,该类是窗口类的子类,我们可以在其中查找“开始回测”按钮相关的语句。
在这里插入图片描述
我们找到了开始回测按钮定义的位置,发现点击开始回测按钮后,触发的是self.start_backtesting函数,于是我们转入分析BacktesterManager.start_backtesting函数。

BacktesterManager.start_backtesting函数

我们很快定位到了start_backtesting函数。
在这里插入图片描述
接下来我们分析start_backtesting函数中的代码。
在这里插入图片描述
可以看到,这个函数一开始是从窗口的输入框中读取用户输入的配置。对应窗口中左上角的位置。最后的save_json只是把配置以json的格式保存到了一个文件中。
在这里插入图片描述
而后面的几行代码,调用了BacktestingSettingEditor,主要是给用户修改策略中的参数。
在这里插入图片描述
BacktestingSettingEditor会弹出一个Dialog,内容如下,是配置策略中的参数的。
在这里插入图片描述
而在start_backtesting函数的最后,调用了backtester_engine下面的另一个start_backtesting函数。所以我们下面要分析这个新的start_backtesting函数。
在这里插入图片描述

BacktesterEngine.start_backtesting

最终新的start_backtesting函数,发现其是定义在vnpy\app\cta_backtester\engine.py文件中的BacktesterEngine类中。
在这里插入图片描述
找到该目录下的start_backtesting函数,发现其只是创建了一个子线程,该线程执行的是run_backtesting函数。

    def start_backtesting(
        self,
        class_name: str,
        vt_symbol: str,
        interval: str,
        start: datetime,
        end: datetime,
        rate: float,
        slippage: float,
        size: int,
        pricetick: float,
        capital: int,
        inverse: bool,
        setting: dict
    ):
        if self.thread:
            self.write_log("已有任务在运行中,请等待完成")
            return False

        self.write_log("-" * 40)
        self.thread = Thread(
            target=self.run_backtesting,
            args=(
                class_name,
                vt_symbol,
                interval,
                start,
                end,
                rate,
                slippage,
                size,
                pricetick,
                capital,
                inverse,
                setting
            )
        )
        self.thread.start()

        return True

BacktesterEngine.run_backtesting

继续追踪到run_backtesting函数,这个还是调用了其他的函数。主要是调用了engine对象的add_strategy()、load_data()和run_backtesting()函数。继续追踪,发现engine是BacktestingEngine类的一个对象,而该类是定义在vnpy-master\vnpy\app\cta_strategy\backtesting.py文件中的。
在这里插入图片描述

BacktestingEngine.add_strategy

该函数是将策略文件加载进来。在vnpy中,所有的策略文件都放在一个文件夹中,如下:
在这里插入图片描述
在上面的介绍的BacktesterEngine中有加载策略类的函数,该函数会在窗口初始化的时候被调用,把该上图目录下的策略文件以“类”的形式存储在字典变量BacktesterEngine.classes中。而add_strategy便是要实例化策略类,生成一个策略对象self.strategy.
在这里插入图片描述
上图便是把目录下的策略文件全部加载到self.classes的核心语句,该函数在vnpy-master\vnpy\app\cta_backtester\engine.py中。
下图是BacktestingEngine.add_strategy的代码,可以看到,该函数根据策略类和相关的参数实例化了一个策略对象。
在这里插入图片描述
下图是策略类的初始化函数,可以看到,其参数便是add_strategy中传入的参数。
在这里插入图片描述

BacktestingEngine.load_data

load_data,顾名思义,就是加载数据。该函数就是从数据库中加载历史数据,保存到self.history_data中。同时该函数调用了load_bar_data和load_tick_data两个函数,这两个函数比较麻烦,被封装的层数比较多,但也就是从数据库中获取数据而已,只是对sql语句的封装,所以这里不对这两个函数展开讲了。

    def load_data(self):
        """"""
        self.output("开始加载历史数据")

        if not self.end:
            self.end = datetime.now()

        if self.start >= self.end:
            self.output("起始日期必须小于结束日期")
            return

        self.history_data.clear()       # Clear previously loaded history data

        # Load 30 days of data each time and allow for progress update
        progress_delta = timedelta(days=30)
        total_delta = self.end - self.start
        interval_delta = INTERVAL_DELTA_MAP[self.interval]

        start = self.start
        end = self.start + progress_delta
        progress = 0

        while start < self.end:
            end = min(end, self.end)  # Make sure end time stays within set range

            if self.mode == BacktestingMode.BAR:
                data = load_bar_data(#这里调用了load_bar_data函数,从数据库中获取数据
                    self.symbol,
                    self.exchange,
                    self.interval,
                    start,
                    end
                )
            else:
                data = load_tick_data(
                    self.symbol,
                    self.exchange,
                    start,
                    end
                )

            self.history_data.extend(data)

            progress += progress_delta / total_delta
            progress = min(progress, 1)
            progress_bar = "#" * int(progress * 10)
            self.output(f"加载进度:{progress_bar} [{progress:.0%}]")

            start = end + interval_delta
            end += (progress_delta + interval_delta)

        self.output(f"历史数据加载完成,数据量:{len(self.history_data)}")

BacktestingEngine.run_backtesting

下面我们看回测引擎中最核心的部分。具体看我添加的中文注释。

    def run_backtesting(self):
        """"""
        if self.mode == BacktestingMode.BAR:#策略是基于bar还是tick,这里以bar为例
            func = self.new_bar
        else:
            func = self.new_tick

        self.strategy.on_init()#策略的初始化函数,后面会详细讨论,该函数给self.days和self.callback赋了值。

        # Use the first [days] of history data for initializing strategy
        day_count = 1
        ix = 0

        for ix, data in enumerate(self.history_data):#遍历历史数据,给策略初始化
            if self.datetime and data.datetime.day != self.datetime.day:
                day_count += 1
                if day_count >= self.days:
                    break

            self.datetime = data.datetime

            try:
                self.callback(data)#回调函数,实际上是策略类的on_bar函数
            except Exception:
                self.output("触发异常,回测终止")
                self.output(traceback.format_exc())
                return

        self.strategy.inited = True
        self.output("策略初始化完成")

        self.strategy.on_start()
        self.strategy.trading = True#更改状态
        self.output("开始回放历史数据")

        # Use the rest of history data for running backtesting
        for data in self.history_data[ix:]:#遍历历史数据
            try:
                func(data)#调用了self.new_bar
            except Exception:
                self.output("触发异常,回测终止")
                self.output(traceback.format_exc())
                return

        self.output("历史数据回放结束")

主要由三个关键的函数strategy.on_init()、callback()、func()(如果策略是基于bar的,func也就是self.new_bar),我们一个个分析.

strategy.on_init

我们以双均线策略中的on_init为例,可以看到其主要调用了load_bar函数。
在这里插入图片描述
而load_bar函数是在策略模板类(即策略类的父类)中定义的。如下图,我们可以看到,在load_bar函数中,将callback赋值为self.on_bar函数,再调用回测引擎中的load_bar函数。
在这里插入图片描述
我们追踪到回测引擎(也就是前面的BacktestingEngine)中的load_bar函数,该函数就是给self.callback和self.days赋值,其中days就是strategy.on_init中调用时传入的10,而callback就是在策略模板类中的load_bar调用时设置成的self.on_bar.
在这里插入图片描述
on_init函数是为了初始化策略,提前将10天的数据加载进来,方便后面计算均线等指标。

callback(strategy.on_bar)

上面分析了,callback就是策略类中的on_bar函数,所以我们查看策略类的on_bar函数。这里以双均线策略为例。

    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """

        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return

        fast_ma = am.sma(self.fast_window, array=True)
        self.fast_ma0 = fast_ma[-1]
        self.fast_ma1 = fast_ma[-2]

        slow_ma = am.sma(self.slow_window, array=True)
        self.slow_ma0 = slow_ma[-1]
        self.slow_ma1 = slow_ma[-2]

        cross_over = self.fast_ma0 > self.slow_ma0 and self.fast_ma1 < self.slow_ma1
        cross_below = self.fast_ma0 < self.slow_ma0 and self.fast_ma1 > self.slow_ma1

        if cross_over:
            if self.pos == 0:
                self.buy(bar.close_price, 1)
            elif self.pos < 0:
                self.cover(bar.close_price, 1)
                self.buy(bar.close_price, 1)

        elif cross_below:
            if self.pos == 0:
                self.short(bar.close_price, 1)
            elif self.pos > 0:
                self.sell(bar.close_price, 1)
                self.short(bar.close_price, 1)

        self.put_event()

可以看出,策略类中的on_bar函数里编写的就是我们策略的逻辑。根据信号,进行sell、buy等交易操作。而在on_bar中,还有一个am对象和am.update_bar()函数,这个我们放在后面分析,这里先看一下self.buy等函数。我们以buy为例。

strategy.on_buy
在这里插入图片描述
该函数的内容上图所示,其调用了send_order函数。该函数根据trading的值来做选择。而trading的值是在BacktestingEngine.run_backtesting中,策略初始化结束后,正式回测之前被置为True。所以在初始化阶段,该函数返回空列表,在回测阶段,该函数调用回测引擎的send_order函数。
在这里插入图片描述

BacktestingEngine.send_order

现在我们回到BacktestingEngine,分析其中的send_order类型。发现这个函数会根据订单的类型(stop还是limit),选择调用不同的函数。
在这里插入图片描述

BacktestingEngine.send_limit_order

我们以现价单为例,其函数内容如下:
在这里插入图片描述
该函数根据传入的参数,生成了一个订单对象,并将该对象存储到了active_limit_orders和limit_orders中。而vt_orderid属性则是根据订单的序号等信息生成的一个字符串,没有什么特殊的含义。

func

如果策略是基于bar的,那个func函数也就是new_bar函数。该函数会在遍历历史数据的时候被重复调用,我们查看new_bar函数。
在这里插入图片描述
func中调用了cross_limit_order和cross_stop_order,这两个函数是根据最新价来撮合成交的,订单信息就保存在上面提到的self.active_limit_orders中,同时更改仓位信息,把成交的信息保存在self.trades中。

    def cross_limit_order(self):
        """
        Cross limit order with last bar/tick data.
        """
        if self.mode == BacktestingMode.BAR:
            long_cross_price = self.bar.low_price
            short_cross_price = self.bar.high_price
            long_best_price = self.bar.open_price
            short_best_price = self.bar.open_price
        else:
            long_cross_price = self.tick.ask_price_1
            short_cross_price = self.tick.bid_price_1
            long_best_price = long_cross_price
            short_best_price = short_cross_price

        for order in list(self.active_limit_orders.values()):
            # Push order update with status "not traded" (pending).
            if order.status == Status.SUBMITTING:
                order.status = Status.NOTTRADED
                self.strategy.on_order(order)

            # Check whether limit orders can be filled.
            long_cross = (
                order.direction == Direction.LONG
                and order.price >= long_cross_price
                and long_cross_price > 0
            )

            short_cross = (
                order.direction == Direction.SHORT
                and order.price <= short_cross_price
                and short_cross_price > 0
            )

            if not long_cross and not short_cross:
                continue

            # Push order udpate with status "all traded" (filled).
            order.traded = order.volume
            order.status = Status.ALLTRADED
            self.strategy.on_order(order)

            self.active_limit_orders.pop(order.vt_orderid)

            # Push trade update
            self.trade_count += 1

            if long_cross:
                trade_price = min(order.price, long_best_price)
                pos_change = order.volume
            else:
                trade_price = max(order.price, short_best_price)
                pos_change = -order.volume

            trade = TradeData(
                symbol=order.symbol,
                exchange=order.exchange,
                orderid=order.orderid,
                tradeid=str(self.trade_count),
                direction=order.direction,
                offset=order.offset,
                price=trade_price,
                volume=order.volume,
                datetime=self.datetime,
                gateway_name=self.gateway_name,
            )

            self.strategy.pos += pos_change#更改仓位信息
            self.strategy.on_trade(trade)

            self.trades[trade.vt_tradeid] = trade#保存交易数据

而同时,new_bar函数还调用了策略的on_bar函数,上面已经分析过,on_bar函数会判断交易信号,产生订单,保存到self.active_limit_orders中。所以这个new_bar函数就是不断地根据新的bar来撮合已有的还未成交的订单成交,同时将bar传送给策略,策略产生新的订单。

new_bar中最后的一行,update_daily_close只是记录每个交易日的收盘价,没有特殊的内容。

至此,回测框架的流程基本上就分析完了。还有上面提到的策略中的am对象和am.update_bar()函数。

ArrayManager

策略中的am对象实际上是ArrayManager的实例化,该类可以存储历史数据列表,同时封装了一些常用的指标函数,如sma等。
而update_bar函数就是把最新的bar添加进去,把较远的bar删掉(也就是移动窗口,窗口的长度默认为100个bar)。
在这里插入图片描述

至此,我们已经把vnpy回测框架中最核心的部分都提取出来了。后面就是根据self.trades中的成交数据,计算损益等等指标了,这部分的内容等过几天再更新。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐