使用flink-cdc采集mysql数据
前言:在使用flink-cdc采集mysql数据时,会遇到各种问题,本文记录了使用flink-cdc采集mysql的流程操作。
1.版本选择:
本人使用的是flink 1.15.0 和 flink-connector-mysql-cdc 2.2.0
2.冲突问题:
直接引用会有版本冲突:flink-shaded-guava30和flink-shaded-guava18冲突,因为flink15使用的是guava30,而flink-connector-mysql-cdc的2.2版本用的是guava18
3.解决冲突:
这里我选用重新编译flink-cdc-connectors,在git上下载2.2版本的flink-cdc-connectors源码,修改代码中用到flink-shaded-guava18的代码。
3.1 修改项目pom
把flink-shaded-guava的18.0-13.0版本升级到30.1.1-jre-15.0
3.2 修改源码
把import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder改成import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder用到guava18的地方均改为guava30
3.3 重新编译:
mvn spotless:apply mvn clean install -Dmaven.test.skip=true
3.4 其他:
最新master的2.3版本的flink-connector-mysql-cdc直接使用的就是flink-shaded-guava:30.1.1-jre-15.0,也可以替换成2.3版本
4.开发流程
1.导入重新编译的flink-connector-mysql-cdc依赖
2.直接上完整代码
public class FlinkMysqlCDC { public static void main(String[] args) throws Exception { MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("localhost") .port(3306) .databaseList("hc") .tableList("hc.student") .username("flinkuser") .password("123456") .deserializer(new JsonDebeziumDeserializationSchema()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(3000); env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"MySql Source") .setParallelism(4) .print().setParallelism(1); env.execute("MySql Source Reader"); } }
3.mysql权限和设置
# 创建mysql用户: CREATE USER flinkuser@localhost IDENTIFIED BY 123456; # 授予用户所需权限 GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO flinkuser IDENTIFIED BY 123456; #最终确定用户的权限 FLUSH PRIVILEGES; # 设置server id (目前没发现需要的实际操作) SELECT * FROM source_table /*+ OPTIONS(server-id=5401-5404) */ ; #mysql会话超时(大型作业可能会用到) interactive_timeout wait_timeout #增量快照原理 #将表拆分成块(chunk),并行读取chunk; #1.记录binlog的low_offset,2.读取快照,3.binlog为high_offset,4.读取low-high的binlog合到chunk中输出,最后单个读取high后的binlog
欢迎分享讨论。
下一篇:
【HBZ分享】后端如何抗高并发