前提: 被监听数据库需要开启bin_log , 账号需要有可查看bin_log日志权限

<>1. 添加依赖
<flink-version>1.12.0</flink-version> <flink-mysql-version>2.0.0</flink-mysql-
version> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java
</artifactId> <version>${flink-version}</version> </dependency> <dependency> <
groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</
artifactId> <version>${flink-version}</version> </dependency> <dependency> <
groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <
version>${flink-version}</version> </dependency> <dependency> <groupId>org.
apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <
version>1.12.0</version> </dependency> <dependency> <groupId>com.ververica</
groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>${flink-
mysql-version}</version> </dependency> <!--flinkcdc 结束-->
<>2. 编写封装自定义监听返回结果集 json
/** * @author: xrp * @date: 2022/05/05/10:25 * @description */ public class
LinkConfig implements DebeziumDeserializationSchema<String> { /** * * @param
sourceRecord * @param collector */ @Override public void deserialize(
SourceRecord sourceRecord, Collector<String> collector) { JSONObject result =
new JSONObject(); String topic = sourceRecord.topic(); String[] fields = topic.
split("\\."); result.put("db", fields[1]); result.put("tableName", fields[2]);
//获取before数据 Struct value = (Struct) sourceRecord.value(); Struct before = value
.getStruct("before"); JSONObject beforeJson = new JSONObject(); if (before !=
null) { beforeJson = getJson(before); } result.put("before", beforeJson);
//获取after数据 Struct after = value.getStruct("after"); JSONObject afterJson = new
JSONObject(); if (after != null) { //获取列信息 afterJson = getJson(after); } result.
put("after", afterJson); //获取操作类型 Envelope.Operation operation = Envelope.
operationFor(sourceRecord); result.put("op", operation); collector.collect(
result.toJSONString()); } private JSONObject getJson(Struct struct) { JSONObject
jsonObject= new JSONObject(); //获取列信息 Schema schema = struct.schema(); List<
Field> fieldList = schema.fields(); for (Field field : fieldList) { jsonObject.
put(field.name(), struct.get(field)); } return jsonObject; } @Override public
TypeInformation<String> getProducedType() { return BasicTypeInfo.
STRING_TYPE_INFO; } }
<>3. 编写主程序,实现CommandLineRunner ,并且使用@component (目的为了自动执行改方法,进行实时数据监听)
/** * @author: xrp * @date: 2022/05/05/14:49 * @description */ @Component
public class StartUp implements CommandLineRunner { @Override public void run(
String... args) throws Exception { StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder() .
hostname("要监听的数据库地址") .port(端口号) .username("数据库账户") .password("数据库密码") .
databaseList("数据库") .tableList("数据库.数据库表1", "数据库。数据库表2") //多个表逗号分隔 .deserializer
(new LinkConfig()) //自定义返回结果集 .startupOptions(StartupOptions.initial()) .
serverTimeZone("UTC") .build(); DataStreamSource<String> streamSource = env.
addSource(sourceFunction); // 多表进行分片处理 OutputTag<String> orderTag = new
OutputTag<>("表1", Types.STRING); OutputTag<String> userTag = new OutputTag<>(
"表2",Types.STRING); SingleOutputStreamOperator<String> process = streamSource.
map((MapFunction<String, JSONObject>) JSON::parseObject).process(new
ProcessFunction<JSONObject, String>() { @Override public void processElement(
JSONObject value, Context context, Collector<String> collector) { if ("表1".
equals(value.getString("tableName"))) { context.output(orderTag, value.
toJSONString()); } else if ("表2".equals(value.getString("tableName"))) { context
.output(userTag, value.toJSONString()); } } }); DataStream<String> orderStream =
process.getSideOutput(orderTag); DataStream<String> userStream = process.
getSideOutput(userTag); orderStream.print(); userStream.print(); //自定义sink
streamSource.addSink(new ListenerOrderSink()); userStream.addSink(new
ListenerUserSink()); env.executeAsync("fLinkCDC"); } }
<>4. 编写sink (一张表一个对应一个sink)
/** * @author: xrp * @date: 2022/05/24/17:27 * @description 订单sink */ public
class ListenerOrderSink extends RichSinkFunction<String> { private static final
Logger LOGGER = LoggerFactory.getLogger(ListenerOrderSink.class); private
PreparedStatement ps = null; private Connection connection = null; String driver
= "com.mysql.cj.jdbc.Driver"; String url =
"jdbc:mysql://要将监听到的数据同步到哪,另一个数据库地址:端口号/数据库名字?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
; String username = "数据库账号"; String password = "数据库密码"; @Override public void
open(Configuration parameters) throws Exception { super.open(parameters);
connection= getConn(); ps = connection.prepareStatement("insert into
数据库名.表名values (?,?,?,?,?,?,?,?,?,?,?)"); } private Connection getConn() { try {
Class.forName(driver); connection = DriverManager.getConnection(url, username,
password); LOGGER.error("数据库连接成功"); } catch (Exception e) { LOGGER.error(
"数据库连接失败"); } return connection; } @Override public void invoke(String p,
Context context) throws Exception { //TranslateJson 将自定义的返回结果集转为具体实体
TranslateJson translateJson = JSON.parseObject(p, TranslateJson.class); if (
ConstantValue.CREATE_INFO.equals(translateJson.getOp())) { ErpOrder erpOrder =
JSON.parseObject(translateJson.getAfter(), ErpOrder.class); // 注意:
字段个数需要与表字段个数对应上 ps.setString(1,erpOrder.getId()); ps.setString(2,erpOrder.
getCode()); ps.setString(3,erpOrder.getCustomerName()); ps.setBigDecimal(4,
erpOrder.getOrderAmount()); ps.setString(5,erpOrder.getUserId()); ps.setString(6
,erpOrder.getUserName()); ps.setString(7,erpOrder.getCreateName()); ps.setString
(8,erpOrder.getCreateDate()); ps.setInt(9,erpOrder.getStatus()); ps.setInt(10,
erpOrder.getOrderId()); ps.setString(11,erpOrder.getOrderCode()); ps.
executeUpdate(); } } @Override public void close() throws Exception { super.
close(); if(connection != null){ connection.close(); } if (ps != null){ ps.close
(); } } }
<>5. 将自定义统一监听结果集转为实体
/** * @author: xrp * @date: 2022/05/05/10:50 * @description */ @Data public
class TranslateJson { private static final long serialVersionUID = -
74375380912179188L; private String op; private String before; private String
after; private String db; private String tableName; }

技术
下载桌面版
GitHub
百度网盘(提取码:draw)
Gitee
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:766591547
关注微信