flink sql (jdbc)如何支持where 条件下推数据库
背景
最近在使用 flink sql (jdbc)做离线数据同步(历史数据修复),遇到一个问题,只同步几条数据的情况下,测试环境执行竟然需要30+分钟。
进一步研究,发现where条件没有下推到数据库执行,而是全表读取(排查过程详见下面的文章)。
为了支持过滤条件下推,这里提供一些解决方案。
解决办法
提供两个解决办法: 1、使用分区字段来做过滤 2、修改源码或者自定义connector
1、使用分区字段来做过滤 关于connector配置,详见官网() 我们可以使用分区扫描的功能来实现数据过滤。 举个例子:
create table mysql_test_12 ( ID STRING, NAME STRING, primary key(ID) not enforced ) with ( connector = jdbc, url = jdbc:mysql://${mysql_hosts}:${mysql_port}/sit?useSSL=false&useUnicode=true&characterEncoding=UTF-8, username = ${mysql_username}, password = ${mysql_pass}, scan.fetch-size=1000, table-name = test_12, scan.partition.column = ID, scan.partition.num = 1, scan.partition.lower-bound = 20200604, scan.partition.upper-bound = 20200604 ); create table es_test_12 ( ID STRING, NAME STRING, primary key(ID) not enforced ) with ( connector = ${es_connector}, hosts = ${es_hosts}, username = ${es_username}, password = ${es_pass}, index = test_12 ); insert into es_test_12 select * from mysql_test_12 ;
落库执行的SQL是:
SELECT ID, NAME FROM test_12 WHERE ID BETWEEN 20200604 AND 20200604
上述方法起到数据过滤的效果,减少查询开销,加速程序执行效率,使用起来也比较简单。
但同时也会存在一些缺点: 1)、分区字段目前只支持 数字、日期、时间戳,对于字符串是不支持的; 2)、between 无法使用索引; 2)、使用起来不够灵活,比如不支持 多个过滤条件、函数使用等
2、修改源码或者自定义connector 修改源码或者自定义connector原理差不多,考虑到自定义connector拓展性更强,这里选择自定义connector的方式来解决。 下面记载开发过程,主要提供一下解决思路:
新的connector type为"jdbc-custom"。
1、首先在resource 下创建目录:META-INF/service,在该目录下创建文件org.apache.flink.table.factories.Factory 文件内容为新建的factory类:com.custom.connector.JdbcCustomDynamicTableFactory
2、为了减少代码开发,可以直接拷贝源码 JdbcDynamicTableFactory、JdbcDynamicTableSource 到com.custom.connector目录下,并修改类名为JdbcCustomDynamicTableFactory、JdbcCustomDynamicTableSource 。
JdbcCustomDynamicTableFactory的 IDENTIFIER = “jdbc” 修改为 IDENTIFIER = “jdbc_custom”;
createDynamicTableSource()方法也做下简单修改 3、在JdbcDynamicTableSource#getScanRuntimeProvider() 方法中改造SQL生成逻辑,主要修改的代码如下: 其中customOptions 是新增的配置项,定义了"filter" 选项,在 JdbcCustomDynamicTableFactory 中定义。可以参考jdbcReadOptions定义过程。
最终使用方法如下:
以上通过2种方式优化了flink sql 执行效率,使过滤条件入库执行。
在项目中,本人采用了第二种方式,效果还不错。希望以上思路对你有帮助~~