waterdrop1.x导入clickhouse分布式表-fitersql
接上一篇,最后留下的两个问题, 针对问题2:在不修改源代码的情况下,如何实现分布式表的本地hash方式写入?现在做一些尝试和验证。思路:waterdrop是可以进行多数据流程处理的,官方说明文档:配置示例3 : 一个灵活的多数据流程处理利用这个特点,可以配置多个output和多个fiter一一对应,在fiter中利用spark sql进行hash分流,注册N个临时表,然后对应N个output输出到
接上一篇,最后留下的两个问题, 针对问题2:在不修改源代码的情况下,如何实现分布式表的本地hash方式写入?
现在做一些尝试和验证。
思路:
waterdrop是可以进行多数据流程处理的,官方说明文档:配置示例3 : 一个灵活的多数据流程处理
利用这个特点,可以配置多个output和多个fiter一一对应,在fiter中利用spark sql进行hash分流,注册N个临时表,然后对应N个output输出到N个shard中。
下面来实际测试:
在测试之前,请确保你的clickhouse分布式配置以及完成。
1、创建表:
-- 创建本地表,在所有节点中都需要执行
DROP TABLE IF EXISTS dw_local.dist_test;
CREATE TABLE dw_local.dist_test(
id String COMMENT 'id' ,
user_name String COMMENT '用户姓名'
)
engine = MergeTree
primary key (id)
order by (id)
;
truncate table dw_local.dist_test;
-- 创建分布式表
DROP TABLE IF EXISTS dw.dist_test;
CREATE TABLE dw.dist_test(
id String COMMENT 'id' ,
user_name String COMMENT '用户姓名'
)
ENGINE = Distributed(dw_cluster, dw_local, dist_test);
select * from dw_local.dist_test t ;
select * from dw.dist_test t ;
2、准备数据:
vi /tmp/dist_test.csv
id,user_name
1,zhangsan
2,lisi
3,wangwu
4,lili
5,lucy
6,poli
7,lilei
8,hanmeimei
9,liudehua
10,wuyanzu
3、waterdrop配置:
vi /tmp/dist_test2.conf
spark {
spark.app.name = "Waterdrop"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
}
input {
file {
path = "file:///tmp/dist_test.csv"
format = "csv"
options.header = "true"
result_table_name = "dist_test"
}
}
filter {
sql {
sql = "select * from dist_test where abs(hash(user_name))%2 =0 ",
result_table_name="dist_test_0"
}
sql {
sql = "select * from dist_test where abs(hash(user_name))%2 =1 ",
result_table_name="dist_test_1"
}
}
output {
clickhouse {
"说明":"clickhouse节点0,对应输出filter插件的第1个输出"
source_table_name="dist_test_0"
host = "10.1.99.190:8123"
database = "dw_local"
table = "dist_test"
bulk_size = 1
username = "user"
password = "password"
}
clickhouse {
"说明":"clickhouse节点1,对应输出filter插件的第2个输出"
source_table_name="dist_test_1"
host = "10.1.99.191:8123"
database = "dw_local"
table = "dist_test"
bulk_size = 1
username = "user"
password = "password"
}
}
4、执行导入:
bin/start-waterdrop.sh --master local[1] --deploy-mode client --config /tmp/dist_test2.conf
5、查询数据:
-- 节点1
select * from dw_local.dist_test t ;
rhtx-yyxx-db02 :) select * from dw_local.dist_test t ;
SELECT *
FROM dw_local.dist_test AS t
Query id: fb82ee42-77d2-4988-996a-2b187837a97f
┌─id─┬─user_name─┐
│ 1 │ zhangsan │
│ 10 │ wuyanzu │
│ 4 │ lili │
│ 5 │ lucy │
│ 8 │ hanmeimei │
│ 9 │ liudehua │
└────┴───────────┘
-- 节点2
SELECT *
FROM dw_local.dist_test AS t
Query id: 16583ae0-9c4e-49c2-8940-cc3b58d910ea
┌─id─┬─user_name─┐
│ 3 │ wangwu │
└────┴───────────┘
┌─id─┬─user_name─┐
│ 2 │ lisi │
└────┴───────────┘
┌─id─┬─user_name─┐
│ 7 │ lilei │
└────┴───────────┘
┌─id─┬─user_name─┐
│ 6 │ poli │
└────┴───────────┘
查询结果似乎没有什么问题,但是我们来分析下执行日志
6、日志分析
##
*#* *#* *#* ##
*#* *#* *#* ##
*#* *###* *#* ** ##
*#* *#*#* *#* ## ##
*#* *#*#* *#* ## ##
*#* ******* *#* ******* ####### ****** ## ***# ****** ## ## ***# ******** ## ******
*#* *#* *#* *#* ####*##* ####### **#####* ##**### **#####*## ##**### **######** ##**#####*
*#* *#* *#* *#* *** *#* ## *#** **#* ##**** **#** **### ##**** **#******#** ##*******#*
*#* *#* *#* *#** *#* ## *#* *#* ##** *#* *## ##** *#* *#* ##* *#*
*#* *#** **#* *#* ## ## *#* *#* ##* *#* ## ##* *#* *#* ## *#*
*#* *#* *#* *#* *****## ## *########* ## *#* ## ## *#* *#* ## *#*
*#* *#* *#* *#* **##***## ## *########* ## *# ## ## *# #* ## #*
*#***#* *#**#** *#** ## ## *#* ## *# ## ## *#* *#* ## *#*
*#*#* *#*#* *#* ## ## *#* ## *#* ## ## *#* *#* ## *#*
*#*#* *#*#* *#* ##* *# *#** ## *#* *## ## *#* *#* ##* *#*
*###* *###* *#*****##* *#** **#*** *** ## *#*******## ## **#** ***#** ##*** **#**
*##** **##* *#####**##* *#### **####### ## *#####**## ## **######** ##*#####**
*#* *#* ********** **## ***#**** ## ****** ## ## ******** ## ******
##
##
##
##
**
22/02/10 15:32:45 INFO Clickhouse: insert into dist_test (`id`,`user_name`) values (?,?)
22/02/10 15:32:45 INFO FileSourceStrategy: Pruning directories with:
22/02/10 15:32:45 INFO FileSourceStrategy: Post-Scan Filters: ((abs(hash(user_name#11, 42)) % 2) = 0)
22/02/10 15:32:45 INFO FileSourceStrategy: Output Data Schema: struct<id: string, user_name: string>
22/02/10 15:32:45 INFO FileSourceScanExec: Pushed Filters:
22/02/10 15:32:45 INFO CodeGenerator: Code generated in 19.274973 ms
22/02/10 15:32:45 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 286.6 KB, free 1457.5 MB)
22/02/10 15:32:45 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 26.4 KB, free 1457.5 MB)
22/02/10 15:32:45 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on rhtx-yyxx-db02:57139 (size: 26.4 KB, free: 1458.4 MB)
22/02/10 15:32:45 INFO SparkContext: Created broadcast 3 from foreachPartition at Clickhouse.scala:162
22/02/10 15:32:45 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194407 bytes, open cost is considered as scanning 4194304 bytes.
22/02/10 15:32:45 INFO SparkContext: Starting job: foreachPartition at Clickhouse.scala:162
22/02/10 15:32:45 INFO DAGScheduler: Got job 1 (foreachPartition at Clickhouse.scala:162) with 1 output partitions
22/02/10 15:32:45 INFO DAGScheduler: Final stage: ResultStage 1 (foreachPartition at Clickhouse.scala:162)
22/02/10 15:32:45 INFO DAGScheduler: Parents of final stage: List()
22/02/10 15:32:45 INFO DAGScheduler: Missing parents: List()
22/02/10 15:32:45 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[14] at foreachPartition at Clickhouse.scala:162), which has no missing parents
22/02/10 15:32:45 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 21.0 KB, free 1457.5 MB)
22/02/10 15:32:45 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 10.6 KB, free 1457.5 MB)
22/02/10 15:32:45 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on rhtx-yyxx-db02:57139 (size: 10.6 KB, free: 1458.4 MB)
22/02/10 15:32:45 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1184
22/02/10 15:32:45 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[14] at foreachPartition at Clickhouse.scala:162) (first 15 tasks are for partitions Vector(0))
22/02/10 15:32:45 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
22/02/10 15:32:45 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, PROCESS_LOCAL, 8241 bytes)
22/02/10 15:32:45 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
22/02/10 15:32:45 INFO CodeGenerator: Code generated in 12.878664 ms
22/02/10 15:32:45 INFO Clickhouse: single mode, the jdbc url is [jdbc:clickhouse://ip:8123/dw_local].
22/02/10 15:32:45 INFO FileScanRDD: Reading File path: file:///tmp/dist_test.csv, range: 0-103, partition values: [empty row]
22/02/10 15:32:45 INFO CodeGenerator: Code generated in 9.624705 ms
22/02/10 15:32:45 INFO Clickhouse: Insert into ClickHouse succeed
22/02/10 15:32:45 INFO Clickhouse: Insert into ClickHouse succeed
22/02/10 15:32:45 INFO Clickhouse: Insert into ClickHouse succeed
22/02/10 15:32:45 INFO Clickhouse: Insert into ClickHouse succeed
22/02/10 15:32:45 INFO Clickhouse: Insert into ClickHouse succeed
22/02/10 15:32:45 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1499 bytes result sent to driver
22/02/10 15:32:45 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 123 ms on localhost (executor driver) (1/1)
22/02/10 15:32:45 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
22/02/10 15:32:45 INFO DAGScheduler: ResultStage 1 (foreachPartition at Clickhouse.scala:162) finished in 0.168 s
22/02/10 15:32:45 INFO DAGScheduler: Job 1 finished: foreachPartition at Clickhouse.scala:162, took 0.172349 s
22/02/10 15:32:45 INFO Clickhouse: insert into dist_test (`id`,`user_name`) values (?,?)
22/02/10 15:32:45 INFO FileSourceStrategy: Pruning directories with:
22/02/10 15:32:45 INFO FileSourceStrategy: Post-Scan Filters: ((abs(hash(user_name#11, 42)) % 2) = 1)
22/02/10 15:32:45 INFO FileSourceStrategy: Output Data Schema: struct<id: string, user_name: string>
22/02/10 15:32:45 INFO FileSourceScanExec: Pushed Filters:
22/02/10 15:32:45 INFO CodeGenerator: Code generated in 16.468001 ms
22/02/10 15:32:45 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 286.6 KB, free 1457.2 MB)
22/02/10 15:32:45 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 26.4 KB, free 1457.2 MB)
22/02/10 15:32:45 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on rhtx-yyxx-db02:57139 (size: 26.4 KB, free: 1458.3 MB)
22/02/10 15:32:45 INFO SparkContext: Created broadcast 5 from foreachPartition at Clickhouse.scala:162
22/02/10 15:32:45 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194407 bytes, open cost is considered as scanning 4194304 bytes.
22/02/10 15:32:45 INFO SparkContext: Starting job: foreachPartition at Clickhouse.scala:162
22/02/10 15:32:45 INFO DAGScheduler: Got job 2 (foreachPartition at Clickhouse.scala:162) with 1 output partitions
22/02/10 15:32:45 INFO DAGScheduler: Final stage: ResultStage 2 (foreachPartition at Clickhouse.scala:162)
22/02/10 15:32:45 INFO DAGScheduler: Parents of final stage: List()
22/02/10 15:32:45 INFO DAGScheduler: Missing parents: List()
22/02/10 15:32:45 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[19] at foreachPartition at Clickhouse.scala:162), which has no missing parents
22/02/10 15:32:45 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 21.0 KB, free 1457.2 MB)
22/02/10 15:32:45 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 10.6 KB, free 1457.2 MB)
22/02/10 15:32:45 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on rhtx-yyxx-db02:57139 (size: 10.6 KB, free: 1458.3 MB)
22/02/10 15:32:45 INFO SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:1184
22/02/10 15:32:45 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[19] at foreachPartition at Clickhouse.scala:162) (first 15 tasks are for partitions Vector(0))
22/02/10 15:32:45 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
22/02/10 15:32:45 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, executor driver, partition 0, PROCESS_LOCAL, 8241 bytes)
22/02/10 15:32:45 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 20
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 63
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 6
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 53
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 10
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 15
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 54
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 26
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 56
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 11
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 14
22/02/10 15:32:45 INFO Clickhouse: single mode, the jdbc url is [jdbc:clickhouse://10.1.99.191:8123/dw_local].
22/02/10 15:32:45 INFO FileScanRDD: Reading File path: file:///tmp/dist_test.csv, range: 0-103, partition values: [empty row]
22/02/10 15:32:45 INFO BlockManagerInfo: Removed broadcast_1_piece0 on rhtx-yyxx-db02:57139 in memory (size: 4.6 KB, free: 1458.3 MB)
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 24
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 58
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 36
22/02/10 15:32:45 INFO BlockManagerInfo: Removed broadcast_3_piece0 on rhtx-yyxx-db02:57139 in memory (size: 26.4 KB, free: 1458.4 MB)
22/02/10 15:32:45 INFO Clickhouse: Insert into ClickHouse succeed
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 59
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 52
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 55
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 66
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 21
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 45
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 65
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 51
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 8
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 57
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 48
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 50
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 64
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 30
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 40
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 60
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 41
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 13
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 29
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 9
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 12
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 22
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 42
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 19
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 23
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 28
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 7
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 43
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 44
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 39
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 37
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 25
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 49
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 27
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 61
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 62
22/02/10 15:32:45 INFO Clickhouse: Insert into ClickHouse succeed
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 38
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 16
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 17
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 18
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 46
22/02/10 15:32:45 INFO ContextCleaner: Cleaned accumulator 47
22/02/10 15:32:45 INFO BlockManagerInfo: Removed broadcast_4_piece0 on rhtx-yyxx-db02:57139 in memory (size: 10.6 KB, free: 1458.4 MB)
22/02/10 15:32:45 INFO Clickhouse: Insert into ClickHouse succeed
22/02/10 15:32:45 INFO Clickhouse: Insert into ClickHouse succeed
22/02/10 15:32:45 INFO Clickhouse: Insert into ClickHouse succeed
22/02/10 15:32:45 INFO Clickhouse: Insert into ClickHouse succeed
22/02/10 15:32:45 INFO Clickhouse: Insert into ClickHouse succeed
22/02/10 15:32:45 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1456 bytes result sent to driver
22/02/10 15:32:45 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 38 ms on localhost (executor driver) (1/1)
22/02/10 15:32:45 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
22/02/10 15:32:45 INFO DAGScheduler: ResultStage 2 (foreachPartition at Clickhouse.scala:162) finished in 0.076 s
22/02/10 15:32:45 INFO DAGScheduler: Job 2 finished: foreachPartition at Clickhouse.scala:162, took 0.079791 s
22/02/10 15:32:45 INFO SparkUI: Stopped Spark web UI at http://rhtx-yyxx-db02:4040
22/02/10 15:32:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/02/10 15:32:45 INFO MemoryStore: MemoryStore cleared
22/02/10 15:32:45 INFO BlockManager: BlockManager stopped
22/02/10 15:32:45 INFO BlockManagerMaster: BlockManagerMaster stopped
22/02/10 15:32:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/02/10 15:32:45 INFO SparkContext: Successfully stopped SparkContext
22/02/10 15:32:45 INFO ShutdownHookManager: Shutdown hook called
22/02/10 15:32:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-2fd877c3-01e6-4ecc-8541-1c51b26b191f
22/02/10 15:32:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-61e1fe3d-f1fb-4bc9-bc6b-e46d17ae027c
搜索关键字 Reading File path ,可以看到有两行日志
22/02/10 15:32:45 INFO FileScanRDD: Reading File path: file:///tmp/dist_test.csv, range: 0-103, partition values: [empty row]
22/02/10 15:32:45 INFO FileScanRDD: Reading File path: file:///tmp/dist_test.csv, range: 0-103, partition values: [empty row]
说明数据源被重复读取了,且从执行过程来看,是先完成了节点1的ETL,然后再进行节点2的ETL。这样的方案使用在生产环境,效率将大大降低。
7、思考
在多数据流的情况下,waterdrop是如何提交job的?有没有什么配置可以做到1)增量读取一批数据,2)进行fiter,3)进行output。然后如此重复。
从官网的文档中似乎没有寻找到类似的方案。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)