使用 AggregatingMergeTree 表来做增量数据的聚合统计,包括物化视图的数据聚合。

股市及期货市场中的K线图的画法包含四个数据,即开盘价、最高价、最低价、收盘价。

以及单位时间内的成交量,成交单数,交易额等数据。

而不同时间周期K线可以通过基础数据聚合获得。比如3分钟K线聚合自3条1分钟K线。

创建明细表

创建一张基础表,这里以1分钟k线做基础表,通过1分钟聚合3分钟5分钟,及更多周期的k线数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
CREATE TABLE IF NOT EXISTS table_1m (
    start_time Int64,
    end_time Int64,
    open Float64,
    close Float64,
    high Float64,
    low Float64,
    volume Float64,
    trade_num Int64,
    quote_volume Float64,
    version DateTime
) ENGINE = ReplacingMergeTree(version)
PARTITION BY toYYYYMM(toDateTime(start_time, 'UTC'))
ORDER BY (start_time)
PRIMARY KEY (start_time)
COMMENT '1m kline data'
;

字段定义:start_time: 开盘时间,end_time: 收盘时间,open: 开盘价,close: 收盘价

high: 最高价,low: 最低价,volume: 成交量,trade_num: 成交笔数,quote_volume: 成交额

引擎选择 ReplaingMergeTree , start_time 做唯一主键和排序。

创建基于1分钟K线的物化视图

物化视图表引擎选择 AggregatingMergeTree , 字段添加 State 后缀,类型为 AggregateFunction

其能够以二进制的形式存储聚合数据的中间状态,聚合结果查询时需要调用 Merge 后缀函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
CREATE MATERIALIZED VIEW IF NOT EXISTS table_3m_a
ENGINE = AggregatingMergeTree()
ORDER BY(start_interval)
POPULATE
AS
SELECT 
 toStartOfInterval(toDateTime(start_time / 1000, 'UTC'), INTERVAL 3 minute) AS start_interval,
 argMinState(open, start_time) AS open,
 argMaxState(close, end_time) AS close,
 minState(start_time) AS open_time,
 maxState(end_time) AS close_time,
 maxState(high) AS high,
 minState(low) AS low, 
 sumState(volume) AS volume,
 sumState(trade_num) AS trade_num,
 sumState(quote_volume) AS quote_volume
FROM table_1m
GROUP BY (start_interval)
;

当物化视图创建之后,如果源表被写入了新数据,那么物化视图也会同步更新。

POPULATE修饰符决定了物化视图的初始化策略:

  • 如果使用了POPULATE修饰符,那么在创建视图的过程中,会连带将源表中 已存在的数据一并导入,如同执行了SELECT INTO一般;

  • 反之,如果不使用POPULATE修饰符,那么物化视图在创建之后是没有数据的,它只会同步在此之后被写入源表的数据。

物化视图目前并不支持同步删除,如果在源表中删除了数据,物化视图的数据仍会保留。

物化视图本质上是一张特殊的数据表,可使用SHOW TABLE查看数据表的列表

创建基于物化视图的普通视图表

创建调用了Merge函数的普通视图,基于物化视图。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
CREATE VIEW IF NOT EXISTS table_3m_v
AS 
SELECT 
 start_interval,
 argMinMerge(open) AS open,
 argMaxMerge(close) AS close,
 minMerge(open_time) AS start_time,
 maxMerge(close_time) AS end_time,
 maxMerge(high) AS high,
 minMerge(low) AS low, 
 sumMerge(volume) AS volume,
 sumMerge(trade_num) AS trade_num,
 sumMerge(quote_volume) AS quote_volume
FROM table_3m_a
GROUP BY (start_interval)
;

创建普通视图方便简化后续的查询SQL。

插入数据

1
2
3
4
5
6
7
insert into table_1m (start_time, end_time, open, close, high, low, volume, trade_num, quote_volume) values 
(1672243200000, 1672243259999, '16578.29', '16576.39', '16578.29', '16569.08', '8.472423', 160, '140413.39080774'),
(1672243260000, 1672243319999, '16571.3', '16577.93', '16579.06', '16569.78', '7.709314', 221, '127788.02226838'),
(1672243320000, 1672243379999, '16578.17', '16573.26', '16578.24', '16567.36', '9.097228', 210, '150771.9145546'),
(1672243380000, 1672243439999, '16575.31', '16592.81', '16592.81', '16574.59', '9.788205', 337, '162330.90030962'),
(1672243440000, 1672243499999, '16593.27', '16599.96', '16602.6', '16592.58', '8.916786', 244, '148008.06068256'),
(1672243500000, 1672243559999, '16597.62', '16595.7', '16600.27', '16594.73', '7.25037', 155, '120334.92142551');

查询普通视图表

1
selct * from table_3m_v;

预期查询结果为2条聚合后的3分钟k线数据。

参考