机会和风险相关的全部信息都蕴藏于数据之中”是一个量化交易员的基础信仰。

所以,一直以来我都有一个执念,就是想要收集全市场的实时交易数据。

国内有三家股票交易所、四家期货交易,共所涉及数万个标的,包括股票、债券、基金、指数、期货、期权等。

交易时段,全市场实时交易数据流就像一座开闸泄洪的水坝,想以优雅的姿态站在下游转存这些数据,并没有想象中那么简单。

经过数次迭代更新,终于实现了一个自己还算满意的方案。未来希望能从这些数据中发现更多有趣的事情。

标的

首先,就是要明确当前市场有哪些在交易的标的。

期货合约有固定的开始和结束时间,期权的合约还会根据行情变化随时加挂,股票虽然相对稳定,但也有新股上市的情况。

好在,TraderApi提供了查询全部标的列表的功能。虽然,我觉得这个查询应该放在MdApi中,但是官方这样设计,我们也就只能这样用。

int ReqQryInstrument(CThostFtdcQryInstrumentField *pQryInstrument, int nRequestID) = 0;

要用TraderApi,就要完成创建实例、连接柜台、看穿式认证、登录等一系列操作。

查询到全部标的之后,根据交易所标的类别分类存储,方便后续根据交易所或者类别查询到相关标的。

证券交易所有,上交所、深交所和北交所。期货交易所有,中金所、上期所、上能所、大商所、郑商所。

标的类别就比较多了,包括期货、期权、股票、基金、债券、指数等。

实时行情

接下来,需要使用MdApi订阅全部标的的实时行情。

MdApi收到推送的实时行情时会调用OnRtnDepthMarketData方法,所以,我们就需要把逻辑写到这个方法中。

由于订阅的标的比较多,要避免在这里进行效率比较低的操作,例如io。

我最初使用的方案是,把数据拷贝之后存放到list中:

void OnRtnDepthMarketData(CThostFtdcDepthMarketDataField *pDepthMarketDataField){    CThostFtdcDepthMarketDataField *pCopy = new CThostFtdcDepthMarketDataField;    memcpy(pCopy, pDepthMarketDataField, sizeof(CThostFtdcDepthMarketDataField));    std::lock_guard<std::mutex> lock(m_mutex);    m_lDataList.push_back(pCopy)}

在另一个线程中从list将数据取出来写进文件中或者存入数据库中:

void process(){    CThostFtdcDepthMarketDataField *pData = NULL;    {        std::lock_guard lock(m_mutex);        if (m_lDataList.size() > 0)        {            pData = m_lDataList.front();        }    }        if (NULL != pData)    {        saveToFile(pData);        delete pData;        std::lock_guard lock(m_mutex);        m_lDataList.pop_front();    }}

还有一种更高效的方案就是使用mmap将文件映射进内存,直接将收到的行情拷贝到映射出来的内存地址:

void OnRtnDepthMarketData(CThostFtdcDepthMarketDataField *pDepthMarketDataField){    m_pMMAPWriter->write(pDepthMarketDataField);}

但是,需要维护写入地址的指针,以及可能存在扩展文件等问题,所以,也相对比较复杂。

数据处理

当然,这一步并不需要实时来做。只要有tick数据源,随时可以处理需要的数据。

为了省去盘后的麻烦,我选择了实时合成1分钟K线数据、日线数据、期货加权指数,以及一些我感兴趣的组合价格数据。

K线的逻辑比较简单:

if (bIsMinuteStart){    Open = pData->LastPrice;    High = pData->LastPrice;    Low = pData->LastPrice;    Close = pData->LastPrice;}else{    Close = pData->LastPrice;    if (Close > High)    {        High = pData->LastPrice;    }    if (Close LastPrice;    }}

当然,还可以根据需要计算其他数据,例如,成交量、均价等。

期货加权指数价格,是把某一个品种所有月份合约的价格以持仓量为权重计算出来的均价。

组合价格则是按照一个公式计算出来的价格,例如:a*IC2210 + b*IC2210,通常a为1,b为-1。

线程管理

最后,实践中发现,为了提交效率,需要多个MdApi实例分别完成订阅任务。另外,程序可能会部署在多台服务器上同时运行。

这就要求能够通过配置文件分配订阅任务。

这也是前面提到的要根据交易所标的类别分类的主要目的。

//上期所期货#define ENUM_DataFlag_SHFEFuture 'a'//上期所期权#define ENUM_DataFlag_SHFEOption 'b'//上能所期货#define ENUM_DataFlag_INEFuture 'c'//上能所期权#define ENUM_DataFlag_INEOption 'd'……

如果配置DataFlaga,c,就表示这个实例负责订阅上期所和上能所的期货行情。

因为对数据处理部分的实时性没有太高的要求,所以我只使用了一个线程按照顺序读取所有行情实例写到本地的数据,合成k线、期货指数、组合价格,然后写入数据库,最后记录日志。

系统启动之后就用N个MdApi线程+1个数据处理线程。