<>1. CDC简介

<>1.1 CDC种类

FlinkCDC,简单了解下Change Data Capture(变更数据获取)的概念:
监控并捕获数据库的变更,将这些变更按照发生的顺序进行记录,写入消息中间件供其他服务订阅及消费。
CDC的种类:主要分为基于查询和基于Binlog两种方式,区别:

<>1.2 FlinkCDC

Flink自然也不甘示弱,FlinkCDC应运而生,通过flink-cdc-connectors
组件,可以直接从MySQL等数据库直接读取全量数据和增量变更数据的source组件

<>2. 实战Coding

通过一个简单的Demo学会使用FlinkCDC

<>2.1 DataStream方式

通过创建maven项目,通过pom文件注入相关依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</
artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>
org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <
version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</
groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.12.0</version>
</dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>
hadoop-client</artifactId> <version>3.1.3</version> </dependency> <dependency> <
groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>
5.1.49</version> </dependency> <dependency> <groupId>com.alibaba.ververica</
groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.2.0</
version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>
fastjson</artifactId> <version>1.2.75</version> </dependency> </dependencies> <
build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <
artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <
configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</
descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>
make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals>
</execution> </executions> </plugin> </plugins> </build>
依赖注入后就可以开始Coding…(愉快的打开IDEA)
public class FlinkCDC { public static void main(String[] args) throws Exception
{ //1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment(); env.setParallelism(1); /*2.Flink-CDC 将读取 binlog
的位置信息以状态的方式保存在 CK, 如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序 */
//2.1开启CheckPoint,每五秒做一次CheckPoint env.enableCheckpointing(5); //2.2 指定 CK
的一致性语义 env.getCheckpointConfig().setCheckpointingMode( CheckpointingMode.
EXACTLY_ONCE); //2.3 设置任务关闭的时候保留最后一次 CK 数据 env.getCheckpointConfig().
enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp ointCleanup.
RETAIN_ON_CANCELLATION); //2.4 指定从 CK 自动重启策略 env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(3, 2000L)); //2.5 设置状态后端 env.setStateBackend
(new FsStateBackend("hdfs://master:8020/flinkCDC")); //2.6 设置访问 HDFS 的用户名 System
.setProperty("HADOOP_USER_NAME", "root"); //3.创建 Flink-MySQL-CDC 的 Source
DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder() .
hostname("master") .port(3306) .username("root") .password("000000") .
databaseList("mall-flink") .tableList("mall-flink.z_user_info")
//可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式 .startupOptions(
StartupOptions.initial()) .deserializer(new StringDebeziumDeserializationSchema(
)) .build(); //4.使用 CDC Source 从 MySQL 读取数据 DataStreamSource<String> mysqlDS =
env.addSource(mysqlSource); //5.打印数据 mysqlDS.print(); //6.执行任务 env.execute(); }
}
ok,到这里代码部分已经完成,接下来开始测试
将代码打包上传至服务器 mvn clean package
(确保MySQL Binlog开启状态,若是首次开始,则需重启MySQL)
启动Flink,HDFS集群,最后启动程序(java -jar FlinkCDC.jar)

<>2.2 FlinkSQL方式

同样首先注入依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>
flink-table-planner-blink_2.12</artifactId> <version>1.12.0</version> </
dependency> public class FlinkSQL_CDC { public static void main(String[] args)
throws Exception { //1.创建执行环境 StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //2.创建
Flink-MySQL-CDC 的 Source tableEnv.executeSql("CREATE TABLE user_info (" + " id
INT," + " name STRING," + " phone_num STRING" + ") WITH (" + " 'connector' =
'mysql-cdc'," + " 'hostname' = 'master'," + " 'port' = '3306'," + " 'username'
= 'root'," + " 'password' = '000000'," + " 'database-name' = 'mall-flink'," + "
'table-name' = 'z_user_info'" + ")"); tableEnv.executeSql("select * from
user_info").print(); env.execute(); } }
<>2.3 自定义反序列化器
public class Flink_CDCWithCustomerSchema { public static void main(String[]
args) throws Exception { //1.创建执行环境 StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
//2.创建 Flink-MySQL-CDC 的 Source Properties properties = new Properties();
DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder() .
hostname("master") .port(3306) .username("root") .password("000000") .
databaseList("mall-flink") .tableList("mall-flink.z_user_info") .startupOptions(
StartupOptions.initial()) .deserializer(new DebeziumDeserializationSchema<String
>() { //自定义数据解析器 @Override public void deserialize(SourceRecord sourceRecord,
Collector<String> collector) throws Exception { //获取主题信息,包含着数据库和表名
mysql_binlog_source.gmall-flink.z_user_info String topic = sourceRecord.topic();
String[] arr = topic.split("\\."); String db = arr[1]; String tableName = arr[2]
; //获取操作类型 READ DELETE UPDATE CREATE Envelope.Operation operation = Envelope.
operationFor(sourceRecord); //获取值信息并转换为 Struct 类型 Struct value = (Struct)
sourceRecord.value(); //获取变化后的数据 Struct after = value.getStruct("after"); //创建
JSON 对象用于存储数据信息 JSONObject data = new JSONObject(); for (Field field : after.
schema().fields()) { Object o = after.get(field); data.put(field.name(), o); }
//创建 JSON 对象用于封装最终返回值数据信息 JSONObject result = new JSONObject(); result.put(
"operation", operation.toString().toLowerCase()); result.put("data", data);
result.put("database", db); result.put("table", tableName); //发送数据至下游 collector.
collect(result.toJSONString()); } @Override public TypeInformation<String>
getProducedType() { return TypeInformation.of(String.class); } }) .build();
//3.使用 CDC Source 从 MySQL 读取数据 DataStreamSource<String> mysqlDS = env.addSource(
mysqlSource); //4.打印数据 mysqlDS.print(); //5.执行任务 env.execute(); } }

技术
今日推荐
下载桌面版
GitHub
百度网盘(提取码:draw)
Gitee
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:766591547
关注微信