历史:

在OLAP数据库中,可变数据(Mutable data)通常是不被欢迎的,Clickhouse也是如此,早期版本不支持UPDATE和DELTE操作。在Clickhouse 1.1.54388版本之后才支持UPDATE和DELETE操作,适用于MergeTree引擎,并且这种操作方式是异步的(asynchronous),但是在一些交互场景下很难使用。在一些场景下用户需要修改了数据即刻可以看到。这种实时更新(real-tiime update)在2020年4月可以在clickhouse中实现了。

在clickhouse 开源的2016年ClickHouse当时不支持数据修改。为了模拟更新,只能使用特殊的插入结构,并且数据必须由分区删除。

在GDPR要求的压力下,ClickHouse团队在2018年发布了UPDATE和DELETE。这些异步的,非原子的更新被实现为ALTER TABLE UPDATE语句,并且有可能shuffle 很多数据。当不需要立即知道结果时,这对于批量操作和不频繁更新很有用。

常规意义(Normal SQL)上的SQL语句update支持依旧在clickhouse中缺失,尽管它们每年确实出现在路线图中。如果需要实时更新行为,我们必须使用其他方法。让我们考虑一个实际的用例,并比较ClickHouse中使用它的不同方法。

概述:

Clickhouse提供了delete和update操作,这类操作被称之为Mutation查询,是ALTER语句的变种。虽然Mutation能最终实现修改和删除,但是不能完全以通常意义上的update和delete操作来理解。

1.Mutation操作适用于批量数据的修改和删除

2.不支持事务 一旦语句被提交执行就会立刻对现有的数据产生影响,无法回滚。

3.Mutation操作执行是一个异步的过程,语句提交会立即返回,但是不代表具体逻辑已经执行完毕,具体的执行记录需要在system.mutations系统表查询。

 

1.数据UPDATE和DELETE操作示例:

1.创建表:
CREATE TABLE city
(
    `id` UInt8, 
    `country` String, 
	area String,
    `province` String, 
    `city` String, 
    `create_time` datetime DEFAULT now()
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time)
ORDER BY id;

2.插入数据:
insert into city(id,country,area,province,city) VALUES
(1,'China','North','Hubei','wuhan'),
(2,'China','South','Guangdong','guangzhou'),
(3,'China','South','Guangdong','shenzhen'),
(4,'China','North','Beijing','Beijing'),
(5,'China','South','Shanghai','Shanghai');

数据查询:
┌─id─┬─country─┬─area──┬─province──┬─city──────┬─────────create_time─┐
│  1 │ China   │ North │ Hubei     │ wuhan     │ 2020-06-26 16:11:21 │
│  2 │ China   │ South │ Guangdong │ guangzhou │ 2020-06-26 16:11:21 │
│  3 │ China   │ South │ Guangdong │ shenzhen  │ 2020-06-26 16:11:21 │
│  4 │ China   │ North │ Beijing   │ Beijing   │ 2020-06-26 16:11:21 │
│  5 │ China   │ South │ Shanghai  │ Shanghai  │ 2020-06-26 16:11:21 │
└────┴─────────┴───────┴───────────┴───────────┴─────────────────────┘

5 rows in set. Elapsed: 0.003 sec. 

3.update和delete操作:
-- UPDATE 操作:

ALTER TABLE city UPDATE area='South' WHERE city='wuhan';


Clickhouse>  select database,table,mutation_id,command,create_time,block_numbers.number no,is_done from system.mutations where table='city';


┌─database─┬─table─┬─mutation_id────┬─command────────────────────────────────────┬─────────create_time─┬─no──┬─is_done─┐
│ datasets │ city  │ mutation_5.txt │ UPDATE area = 'South' WHERE city = 'wuhan' │ 2020-06-26 16:12:50 │ [5] │       1 │
└──────────┴───────┴────────────────┴────────────────────────────────────────────┴─────────────────────┴─────┴─────────┘

1 rows in set. Elapsed: 0.004 sec.  
 
--DELETE 操作:
ALTER TABLE city DELETE WHERE city='guangzhou';

┌─database─┬─table─┬─mutation_id────┬─command────────────────────────────────────┬─────────create_time─┬─no──┬─is_done─┐
│ datasets │ city  │ mutation_5.txt │ UPDATE area = 'South' WHERE city = 'wuhan' │ 2020-06-26 16:12:50 │ [5] │       1 │
│ datasets │ city  │ mutation_6.txt │ DELETE WHERE city = 'guangzhou'            │ 2020-06-26 16:44:58 │ [6] │       1 │
└──────────┴───────┴────────────────┴────────────────────────────────────────────┴─────────────────────┴─────┴─────────┘

2 rows in set. Elapsed: 0.004 sec. 

4.查看目录:
# ls -l /var/lib/clickhouse/data/datasets/city/
total 20
drwxr-x---. 2 clickhouse clickhouse 4096 Jun 26 16:12 202006_4_4_0_5
drwxr-x---. 2 clickhouse clickhouse 4096 Jun 26 16:44 202006_4_4_0_6
drwxr-x---. 2 clickhouse clickhouse    6 Jun 26 16:02 detached
-rw-r-----. 1 clickhouse clickhouse    1 Jun 26 16:02 format_version.txt
-rw-r-----. 1 clickhouse clickhouse  109 Jun 26 16:12 mutation_5.txt
-rw-r-----. 1 clickhouse clickhouse   96 Jun 26 16:44 mutation_6.txt


 # cat /var/lib/clickhouse/data/datasets/city/mutation_5.txt 
format version: 1
create time: 2020-06-26 16:12:50
commands: UPDATE area = \'South\' WHERE city = \'wuhan\' 
 # cat /var/lib/clickhouse/data/datasets/city/mutation_6.txt  
format version: 1
create time: 2020-06-26 16:44:58
commands: DELETE WHERE city = \'guangzhou\' 

可以发现在执行了update,delete操作之后数据目录会生成文件mutation_5.txt,mutation_6.txt。
此外还有在同名的
目录下在末尾增加了_5 ,_6的后缀。
可以看到mutation_5.txt和mutation_6.txt 是日志文件,完整的记录了update和delete操作语句和时间。
mutation_id:生成对应的日志文件用于记录相关的信息。
数据删除的过程是以数据表的每个分区目录为单位,将所有目录重写为新的目录,在目录的命名规则是在原有的名称上加上 block_numbers.number
数据的在重写的过程中会讲所需要删除的数据去掉。旧的数据并不会立即删除,而是被标记为非激活状态(active =0),
等到MergeTree引擎的下一次合并动作触发的时候,
这些非活动目录才会被真正的从物理上删除。


UPDATE语句不能修改分区键和主键。

2.数据的实时更新操作(Real-time UPDATE):

应用场景:
假设一个收集各种信息的告警系统,用户或者机器学习算法不时的查询数据库库,以确认最新的告警信息。
若确认了
最新的消息需要修改数据库的告警记录,然后警报消息从用户视图中消除。这看起来像是Clickhouse的OLTP操作。



由于我们无法使用更新,因此我们将不得不插入修改后的记录。 一旦数据库中有两条记录,我们需要一种有效的方法来
获取最新的一条。 为此,我们将尝试3种不同的方法:
ReplacingMergeTree
Aggregate functions
AggregatingMergeTree 

1.创建表:
CREATE TABLE alerts(
  tenant_id     UInt32,
  alert_id      String,
  timestamp     DateTime Codec(Delta, LZ4),
  alert_data    String,
  acked         UInt8 DEFAULT 0,
  ack_time      DateTime DEFAULT toDateTime(0),
  ack_user      LowCardinality(String) DEFAULT ''
)
ENGINE = ReplacingMergeTree(ack_time)
PARTITION BY tuple()
ORDER BY (tenant_id, timestamp, alert_id);

为了简单便于演示,alert_id使用一个随机数,告警信息写入字段alert_data。实际上告警信息表可能有更多的字段信息。

ReplacecingMergeTee是一种特殊的表引擎,它用主键(ORDER BY)替换数据-具有相同键值的行的较新版本将替换较旧的行。
在最新的(Newness)的数据信息由列ack_time确定,数据替换在后台合并操作期间执行。 它不会立即发生,也无法完全保证会发生,因此查询结果的一致性是一个问题。 不过,ClickHouse具有处理此类表的特殊语法:

 2.模拟数据:模拟1000个租户,产生1000万条记录。
INSERT INTO alerts(tenant_id, alert_id, timestamp, alert_data)
SELECT
  toUInt32(rand(1)%1000+1) AS tenant_id,
  randomPrintableASCII(32) as alert_id,
  toDateTime('2020-01-01 00:00:00') + rand(2)%(3600*24*30) as timestamp,
  randomPrintableASCII(256) as alert_data
FROM numbers(10000000);


3.我们让99%的数据标记为acked,由字段ack_user ack_time 确认。我们插入新的行,用以替代update操作。
INSERT INTO alerts (tenant_id, alert_id, timestamp, alert_data, acked, ack_user, ack_time)
SELECT tenant_id, alert_id, timestamp, alert_data, 
  1 as acked, 
  concat('user', toString(rand()%1000)) as ack_user,  now() as ack_time
FROM alerts WHERE cityHash64(alert_id) % 99 != 0;


若立即查询有多少记录则反馈如下:
SELECT count() FROM alerts;
┌──count()─┐
│ 19898060 │
└──────────┘
1 rows in set. Elapsed: 0.008 sec

此时可以确认 确认的数据和未确认的数据都包含在其中,数据的替换操作并未发生。为了查看真实的数据,我们使用关键字FINAL。


Clickhouse> select count(1) from alerts FINAL;

SELECT count(1)
FROM alerts
FINAL

┌─count(1)─┐
│ 10000000 │
└──────────┘

1 rows in set. Elapsed: 1.293 sec. Processed 10.00 million rows, 530.00 MB 
(7.74 million rows/s., 409.97 MB/s.) 

现在该计数是正确的,但查询时间增加了很多! 使用FINAL,ClickHouse必须扫描所有行并在查询时间内通过主键合并它们。 
这会产生正确的答案,但会带来很多开销。 让我们看看是否可以通过仅过滤未确认的行来做得更好。

Clickhouse> SELECT count() FROM alerts FINAL WHERE NOT acked;

SELECT count()
FROM alerts
FINAL
WHERE NOT acked

┌─count()─┐
│  100696 │
└─────────┘

1 rows in set. Elapsed: 0.740 sec. Processed 10.00 million rows, 540.00 MB 
(13.51 million rows/s., 729.55 MB/s.) 


查询时间和数据总量是一样的,尽管计数更小一些。过滤了反而没有加速查询,随着数据规模的增加,查询的成本会增加而不是缩小。


ok 查询整个表 并没有很大的帮助,在我们的场景下还要继续使用ReplacingMergeTree 引起吗?
现在让我们随机查询租户ID(tenant_id)并且查询的所有的数据尚未确认。类似于有一个用户正在查看的仪表板。
由于“ alert_data”只是随机垃圾,因此我们将计算校验和,并使用它来确认结果在多种方法中是相同的:

Clickhouse> SELECT count(),sum(cityHash64(*)) AS data FROM alerts FINAL WHERE (tenant_id = 451) AND (NOT acked);

SELECT 
    count(), 
    sum(cityHash64(*)) AS data
FROM alerts
FINAL
WHERE (tenant_id = 451) AND (NOT acked)

┌─count()─┬────────────────data─┐
│      98 │ 8095029702435009750 │
└─────────┴─────────────────────┘

1 rows in set. Elapsed: 0.025 sec. Processed 16.38 thousand rows, 5.29 MB (650.31 thousand rows/s., 210.01 MB/s.) 

查询及其快,只有25ms就可以查询出非确认的数据。为啥如此快呢?
在这个过滤条件中不同的是tenant_id 是主键的一部分,所以clickhouse在FINAL之前就可以过滤数据,ReplacingMergeTree 变的更加有效。


下面我们尝试过滤一个用户并且查询确认的数据。列的选择性(cardinality )是一样的,有1000个用户使用用户user451:
Clickhouse> SELECT count() FROM alerts FINAL WHERE (ack_user = 'user451') AND acked;

SELECT count()
FROM alerts
FINAL
WHERE (ack_user = 'user451') AND acked

┌─count()─┐
│    9924 │
└─────────┘

1 rows in set. Elapsed: 1.397 sec. Processed 10.00 million rows, 569.71 MB (7.16 million rows/s., 407.76 MB/s.) 
可以看到查询非常慢并且没有使用索引,扫描了1000万条记录。
我们不能在 ack_user列添加索引,因为它会破坏ReplacingMergeTree的语义。 不过,我们可以使用PREWHERE技巧:

Clickhouse> SELECT count() FROM alerts FINAL PREWHERE (ack_user = 'user451') AND acked;

SELECT count()
FROM alerts
FINAL
PREWHERE (ack_user = 'user451') AND acked

┌─count()─┐
│    9924 │
└─────────┘

1 rows in set. Elapsed: 0.277 sec. Processed 10.00 million rows, 567.78 MB (36.04 million rows/s., 2.05 GB/s.) 

PREWHERE是ClickHouse的特殊提示去应用于不同的过滤器。 通常,ClickHouse足够聪明,可以自动将条件移至PREWHERE,
因此用户无需理会。 此次没有发生,便于我们检查核对。


clickhouse以大量的聚合函数功能而闻名,在最新的版本中已经支持超过100个聚合函数,这位经验丰富的用户提供了极大的灵活性。
本案例中我们无需使用任何高级函数,只需要使用argMax,max和any三个函数。
查询租户451 我们可以使用argMax聚合函数:
SELECT count(), sum(cityHash64(*)) data FROM (
  SELECT tenant_id, alert_id, timestamp, 
         argMax(alert_data, ack_time) alert_data, 
         argMax(acked, ack_time) acked,
         max(ack_time) ack_time_,
         argMax(ack_user, ack_time) ack_user
  FROM alerts 
  GROUP BY tenant_id, alert_id, timestamp
) 
WHERE tenant_id=451 AND NOT acked;

结果和查询的行数相同,查询的时间极短。可以看到clickhouse的聚合效率不错,但是缺点是查询变的更加复杂。我们可以让他更加简单:

注意我们确认告警信息的时候只需更新三个列:
acked: 0 => 1
ack_time: 0 => now()
ack_user: '' => ‘user1’

在所有3种情况下,列值都会增加! 因此,我们可以使用 max代替笨重的argMax。
由于我们不更改alert_data,因此在此列上不需要任何实际的汇总。 ClickHouse为此
提供了
友好的any聚合功能。 它可以选择任何值而不会产生额外开销:

SELECT count(), sum(cityHash64(*)) data FROM (
  SELECT tenant_id, alert_id, timestamp, 
    any(alert_data) alert_data, 
    max(acked) acked, 
    max(ack_time) ack_time,
    max(ack_user) ack_user
  FROM alerts
  GROUP BY tenant_id, alert_id, timestamp
) 
WHERE tenant_id=451 AND NOT acked;

查询变的更加简单快速,原因是使用any函数,在alert_data列上无需计算max值

AggregatingMergeTree
AggregatingMergeTree 是clickhouse中最强大的功能之一。当和物化视图结合时,可以实现实时数据聚合的功能。由于我们可以通过AggregatingMergeTree
让查询变的更加高效快速吗?实际上并没有太大的改进。

我们一次只更新一行数据,所以一组只需要聚合两行。对于这种情况,AggregatingMergeTree引擎并不是最佳选择。但是我们可以使用小技巧。
我们知道告警信息始终智慧树先插入未确认的信息再被确认。用户确认告警信息之后只需要修改3列。若我们不重复其他列的
数据,是否可以节省磁盘空间并提升性能呢?

我们使用max聚合函数来实现一个聚合功能的表。我们使用any函数替代max,但是需要一个列可以设置为Nullable,any函数可以选择non-null的值。

DROP TABLE alerts_amt_max;
CREATE TABLE alerts_amt_max (
  tenant_id     UInt32,
  alert_id      String,
  timestamp     DateTime Codec(Delta, LZ4),
  alert_data    SimpleAggregateFunction(max, String),
  acked         SimpleAggregateFunction(max, UInt8),
  ack_time      SimpleAggregateFunction(max, DateTime),
  ack_user      SimpleAggregateFunction(max, LowCardinality(String))
)
Engine = AggregatingMergeTree()
ORDER BY (tenant_id, timestamp, alert_id);


由于原始数据是随机的,我们将从一个alerts表生成一个新的表。
我们使用两个insert语句,一个是确认的告警信息一个是非确认的告警信息。
INSERT INTO alerts_amt_max SELECT * FROM alerts WHERE NOT acked;
INSERT INTO alerts_amt_max 
SELECT tenant_id, alert_id, timestamp,
  '' as alert_data, 
  acked, ack_time, ack_user 
FROM alerts WHERE acked;


对于已确认的事件,我们插入一个空字符串而不是 alert_data 。 
我们知道数据不会改变,并且只能存储一次! 聚合函数将填补空白。 
在实际的应用程序中,我们可以跳过所有未更改的列,并让它们获取默认值。
-- 数据插入之后确认数据:
SELECT  table, sum(rows) AS r,sum(data_compressed_bytes) AS c, sum(data_uncompressed_bytes) AS uc, uc /c AS ratio FROM system.parts WHERE active AND (database = 'datasets') GROUP BY table;

由于是随机字符串我们几乎么有适用压缩,但是聚合小了两倍,因为我们不必存储两次alert_data

SELECT count(), sum(cityHash64(*)) data FROM (
   SELECT tenant_id, alert_id, timestamp, 
          max(alert_data) alert_data, 
          max(acked) acked, 
          max(ack_time) ack_time,
          max(ack_user) ack_user
     FROM alerts_amt_max
   GROUP BY tenant_id, alert_id, timestamp
) 
WHERE tenant_id=451 AND NOT acked;
由于AggregatingMergeTree引擎,我们处理了更小的数据,也更加高效。

Materializing The Update
Clickhouse 将尽其所能的在后台合并数据,删除重复的行并执行聚合。但是有时候,强制合并是有意义的比如释放磁盘空间。
我们可以使用 OPTIMIZE FINAL语句,OPTIMIZE操作是阻塞性且昂贵的,因此不能频繁的执行。我们来看下optimize对查询性能的影响。
OPTIMIZE TABLE alerts FINAL;
OPTIMIZE TABLE alerts_amt_max FINAL;

应该optimize操作之后两个表具有相同的数据行数和其他相同的数据:
Clickhouse> SELECT  table, sum(rows) AS r,sum(data_compressed_bytes) AS c, sum(data_uncompressed_bytes) AS uc, uc /c AS ratio FROM system.parts WHERE active AND (database = 'datasets') GROUP BY table;

SELECT 
    table, 
    sum(rows) AS r, 
    sum(data_compressed_bytes) AS c, 
    sum(data_uncompressed_bytes) AS uc, 
    uc / c AS ratio
FROM system.parts
WHERE active AND (database = 'datasets')
GROUP BY table

┌─table──────────┬────────r─┬──────────c─┬─────────uc─┬──────────────ratio─┐
│ alerts         │ 10000000 │ 2966402515 │ 3060027443 │ 1.0315617747512595 │
│ alerts_amt_max │ 10000000 │ 2966402515 │ 3060027443 │ 1.0315617747512595 │
└────────────────┴──────────┴────────────┴────────────┴────────────────────┘

2 rows in set. Elapsed: 0.003 sec. 


结论:
clickhouse提供了丰富的工具来处理实时更新,比如:
 ReplacingMergeTree, CollapsingMergeTree (not reviewed here), AggregatingMergeTree
 和聚合函数。
 这些方法有三个共同点:
 1.被修改的数据(Modified data)通过插入新版本的数据。clickhouse的INSERT操作极快。
 2.这些方式可以高效的模拟OLTP数据库中的update语义
 3.实际的修改并不是直接发生的,即异步
 特定方法的选取取决于使用场景。
 ReplacecingMergeTree简单明了,对用户来说最方便,但是仅可用于中小型表,或者始终由主键查询数据时使用。
 aggregate functions提供了更大的灵活性和性能,但是需要大量的重写。
 AggregatingMergeTree允许保存存储,仅保留修改的列。

这些是clickhouse DB 设计人员的必备的工具,可以在需要的时候选用。




 

参考:

https://www.altinity.com/blog/2020/4/14/handling-real-time-updates-in-clickhouse

https://www.altinity.com/blog/2018/10/16/updates-in-clickhouse

https://clickhouse.tech/blog/en/2016/how-to-update-data-in-clickhouse/

https://www.altinity.com/blog/2018/10/16/updates-in-clickhouse
https://www.altinity.com/blog/2018/1/23/updatingdeleting-rows-with-clickhouse-part-1
https://www.altinity.com/blog/2018/1/23/updatingdeleting-rows-from-clickhouse-part-2

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐