聊聊 Flink SQL增量查询Hudi表

应用开发2025-11-04 18:09:3074836
​官网文档

地址:https://hudi.apache.org/cn/docs/querying_data#incremental-query

参数read.start-commit 增量查询开始时间 对于流读,聊聊如果不指定该值,增量默认取最新的查询instantTime,也就是聊聊流读默认从最新的instantTime开始读(包含最新的)。对于批读,增量如果不指定该参数,查询只指定read.end-commit,聊聊则实现时间旅行的增量功能,可查询历史记录read.end-commit 增量查询结束时间 不指定该参数则默认读取到最新的查询记录,该参数一般只适用于批读,聊聊因为流读一般的增量需求是查询所有的增量数据read.streaming.enabled 是否流读 默认falseread.streaming.check-interval  流读的检查时间间隔,单位秒(s),查询默认值60,也就是聊聊一分钟查询范围 [BEGIN_INSTANTTIME,END_INSTANTTIME],既包含开始时间又包含结束时间,增量对于默认值可参考上面的查询参数说明版本

建表造数:

Hudi 0.9.0Spark 2.4.5

我这里建表造数使用Hudi Spark SQL 0.9.0,目的是为了模拟项目上用Java Client和Spark SQL创建的Hudi表,以验证Hudi Flink SQL增量查询时是否兼容旧版本的Hudi表(大家没有这种需求的云南idc服务商,可以使用任何方式正常造数)

查询

Hudi 0.13.0-SNAPSHOTFlink 1.14.3 (增量查询)Spark 3.1.2 (主要是为了使用Call Procedures命令查看commit信息)建表造数 复制-- Spark SQL Hudi 0.9.0create table hudi.test_flink_incremental ( id int, name string, price double, ts long,

dt string

)

using hudi

partitioned by (dt) options ( primaryKey = id, preCombineField = ts, type = cow);insert into hudi.test_flink_incremental values (1,a1, 10, 1000, 2022-11-25);insert into hudi.test_flink_incremental values (2,a2, 20, 2000, 2022-11-25);update hudi.test_flink_incremental set name=hudi2_update where id = 2;insert into hudi.test_flink_incremental values (3,a3, 30, 3000, 2022-11-26);insert into hudi.test_flink_incremental values (4,a4, 40, 4000, 2022-12-26);1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.

用show_commits看一下有哪些commits(这里查询用的是Hudi的master,因为show_commits是在0.11.0版本开始支持的,也可以通过使用hadoop命令查看.hoodie文件夹下的.commit文件)

复制call show_commits(table => hudi.test_flink_incremental);1. 复制20221205152736202212051527232022120515271220221205152702202212051526501.2.3.4.5. Flink SQL创建Hudi内存表 复制CREATE TABLE test_flink_incremental ( id int PRIMARY KEY NOT ENFORCED, name VARCHAR(10), price double, ts bigint, dt VARCHAR(10))PARTITIONED BY (dt)WITH ( connector = hudi, path = hdfs://cluster1/warehouse/tablespace/managed/hive/hudi.db/test_flink_incremental); 1.2.3.4.5.6.7.8.9.10.11.12.

建表时不指定增量查询相关的参数,我们在查询时动态指定,这样比较灵活。动态指定参数方法,在查询语句后面加上如下形式的语句

复制/*+ options( read.start-commit = 20221205152723, read.end-commit=20221205152736) */1.2.3.4.5.6. 批读

Flink SQL读Hudi有两种模式:批读和流读。默认批读,先看一下批读的增量查询

验证是否包含起始时间和默认结束时间

复制select * from

test_flink_incremental

/*+ options( read.start-commit = 20221205152723 --起始时间对应id=3的记录) */1.2.3.4.5.6.

结果包含起始时间,不指定结束时间默认读到最新的数据

复制id name price ts dt

4 a4 40.0 4000 dt=2022-12-26 3 a3 30.0 3000 dt=2022-11-261.2.3.

验证是否包含结束时间

复制select * from

test_flink_incremental

/*+ options( read.start-commit = 20221205152712, --起始时间对应id=2的记录 read.end-commit=20221205152723 --结束时间对应id=3的记录) */1.2.3.4.5.6.7.

结果包含结束时间

复制id name price ts dt

3 a3 30.0 3000 dt=2022-11-26 2 hudi2_update 20.0 2000 dt=2022-11-251.2.3.

验证默认开始时间

这种情况是指定结束时间,但不指定开始时间,如果都不指定,则读表所有的最新版本的记录。b2b信息网

复制select * from

test_flink_incremental

/*+ options( read.end-commit=20221205152712 --结束时间对应id=2的更新记录) */1.2.3.4.5.6.

结果:只查询end-commit对应的记录

复制id name price ts dt

2 hudi2_update 20.0 2000 dt=2022-11-251.2.

时间旅行(查询历史记录)

验证是否可以查询历史记录,我们更新id为2的name,更新前name为a2,更新后为hudi2_update,我们验证一下,是否可以通过FlinkSQL查询Hudi历史记录,预期结果查出id=2,name=a2

复制select * from

test_flink_incremental

/*+ options( read.end-commit=20221205152702 --结束时间对应id=2的历史记录) */1.2.3.4.5.6.

结果:可以正确查询历史记录

复制id name price ts dt

2 a2 20.0 2000 dt=2022-11-251.2. 流读

开启流读的参数:

复制read.streaming.enabled = true1.

流读不需要设置结束时间,因为一般的需求是读所有的增量数据,我们只需要验证开始时间就好了

验证默认开始时间

复制select * from

test_flink_incremental

/*+ options( read.streaming.enabled=true, read.streaming.check-interval = 4) */1.2.3.4.5.6.7.

结果:从最新的instantTime开始增量读取,也就是默认的read.start-commit为最新的instantTime

复制id name price ts dt

4 a4 40.0 4000 dt=2022-12-261.2.

验证指定开始时间

复制select * from

test_flink_incremental

/*+ options( read.streaming.enabled=true, read.streaming.check-interval = 4, read.start-commit = 20221205152712) */1.2.3.4.5.6.7.8.

结果:

复制id name price ts dt

2 hudi2_update 20.0 2000 dt=2022-11-25 3 a3 30.0 3000 dt=2022-11-26 4 a4 40.0 4000 dt=2022-11-261.2.3.4.

如果想第一次查询全部的历史数据,可以将start-commit设置的早一点,比如设置到去年:read.start-commit = 20211205152712

复制select * from

test_flink_incremental

/*+ options( read.streaming.enabled=true, read.streaming.check-interval = 4, read.start-commit = 20211205152712) */1.2.3.4.5.6.7.8.

复制id name price ts dt

1 a1 10.0 1000 dt=2022-11-25 2 hudi2_update 20.0 2000 dt=2022-11-25 3 a3 30.0 3000 dt=2022-11-26 4 a4 40.0 4000 dt=2022-11-261.2.3.4.5.

验证流读的连续性

验证新的增量数据进来,是否可以持续消费Hudi增量数据,验证数据的准确一致性,服务器托管为了方便验证,我可以使用Flink SQL增量流读Hudi表然后Sink到MySQL表中,最后通过读取MySQL表中的数据验证数据的准确性

Flink SQL读写MySQL需要配置jar包,将flink-connector-jdbc_2.12-1.14.3.jar​放到lib​下即可,下载地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.3/flink-connector-jdbc_2.12-1.14.3.jar

先在MySQL中创建一张Sink表

复制-- MySQLCREATE TABLE `test_sink` ( `id` int(11), `name` text DEFAULT NULL, `price` int(11), `ts` int(11), `dt` text DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;1.2.3.4.5.6.7.8.

Flink中创建对应的sink表

复制create table test_sink ( id int, name string, price double, ts bigint,

dt string

) with ( connector = jdbc, url = jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8, username = root, password = root-123, table-name = test_sink, sink.buffer-flush.max-rows = 1);1.2.3.4.5.6.7.8.9.10.11.12.13.14.

然后流式增量读取Hudi表Sink Mysql

复制insert into

test_sink

select * from

test_flink_incremental

/*+ options( read.streaming.enabled=true, read.streaming.check-interval = 4, read.start-commit = 20221205152712) */1.2.3.4.5.6.7.8.9.

这样会起一个长任务,一直处于running状态,我们可以在yarn-session界面上验证这一点

然后先在MySQL中验证一下历史数据的准确性

再利用Spark SQL往source表插入两条数据

复制-- Spark SQLinsert into hudi.test_flink_incremental values (5,a5, 50, 5000, 2022-12-07);insert into hudi.test_flink_incremental values (6,a6, 60, 6000, 2022-12-07);1.2.3.

我们增量读取的间隔设置的4s,成功插入数据等待4s后,再在MySQL表中验证一下数据

发现新增的数据已经成功Sink到MySQL中了,并且数据没有重复

最后验证一下更新的增量数据,Spark SQL更新Hudi source表

复制-- Spark SQLupdate hudi.test_flink_incremental set name=hudi5_update where id = 5;1.2.

继续验证结果

结果是更新的增量数据也会insert到MySQL中的sink表,但是不会更新原来的数据

那如果想实现更新的效果呢?我们需要在MySQL和Flink的sink表中加上主键字段,两者缺一不可,如下:

复制-- MySQLCREATE TABLE `test_sink` ( `id` int(11), `name` text DEFAULT NULL, `price` int(11), `ts` int(11), `dt` text DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;1.2.3.4.5.6.7.8.9. 复制-- Flink SQLcreate table test_sink ( id int PRIMARY KEY NOT ENFORCED, name string, price double, ts bigint,

dt string

) with ( connector = jdbc, url = jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8, username = root, password = root-123, table-name = test_sink, sink.buffer-flush.max-rows = 1);1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.

将刚才起的长任务关掉,重新执行刚才的insert语句,先跑一下历史数据,最后再验证一下增量效果

复制-- Spark SQL

update hudi.test_flink_incremental set name=hudi6_update where id = 6;

insert into hudi.test_flink_incremental values (7,a7, 70, 7000, 2022-12-07);1.2.3.

可以看到,达到了预期效果,对于id=6的执行更新操作,对于id=7的执行插入操作。

本文地址:http://www.bzuk.cn/html/193b8499722.html
版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。

全站热门

笔记本处理器i5和i7的区别及选择指南(解读i5和i7,选择最适合你的笔记本处理器)

台式电脑开机后显示器无反应怎么办?(解决方法及注意事项)

重新注册苹果ID的步骤与注意事项(简单易行的方法帮助您重新注册苹果ID)

三星S5K3M3(解析三星S5K3M3的性能和特点,为您带来卓越的拍摄体验)

教你简易转换安卓手机m4a音频为mp3格式(一键操作,高效转换,享受更广泛的音频播放体验)

金河田机箱G6(探索金河田机箱G6的设计与功能,为您带来绝佳的使用体验)

小米笔记本U盘装Win7系统教程(小米笔记本U盘装Win7系统的详细步骤和注意事项)

探索以Virgin的网如何改变我们的生活(颠覆传统,创新无限——以Virgin的网为您打开全新世界)

友情链接

滇ICP备2023006006号-33