注:首先数据库需要开启日志模式
[mysqld] log-bin=/var/lib/mysql/mysql-bin # 开启 binlog binlog-format=ROW # 选择
ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
==============================================================================================================================
docker run -p 11111:11111 --name canal -id canal/canal-server
注:内存不足时canal会自动退出
docker cp canal:/home/admin/canal-server/conf/ /data/canal/ docker run -p
11111:11111 -v /data/canal/conf:/home/admin/canal-server/conf --name canal -d
canal/canal-server
修改配置文件:
/data/canal/conf/example/instance.properties
修改四处 1.修改id canal.instance.mysql.slaveId=10010 2.配置数据库连接地址
canal.instance.master.address=127.0.0.1:3306 3.配置连接数据库名称
canal.instance.dbUsername=root 4.配置连接数据库密码 canal.instance.dbUsername=root12
配置完成后重启容器即可
==============================================================================================================================
pom:
<dependency> <groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency>
CanalThread:
package com.jeesun.gc.jeesunsearch.canal; import
com.alibaba.fastjson.JSONObject; import
com.alibaba.otter.canal.client.CanalConnector; import
com.alibaba.otter.canal.client.CanalConnectors; import
com.alibaba.otter.canal.protocol.CanalEntry; import
com.alibaba.otter.canal.protocol.Message; import
org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory;
import java.net.InetSocketAddress; import java.util.HashMap; import
java.util.List; import java.util.Map; /** * @author javachen * @date 2020/12/24
11:50 * @desc Created with IntelliJ IDEA. */ public class CanalThread
implements Runnable { Log log = LogFactory.getLog(CanalThread.class); @Override
public void run() { listener("101.201.150.23", "11111",
"mytest.goods,mytest.goods_copy"); } public void listener(String canalHost,
String canalPort, String table) { // 创建链接 CanalConnector connector =
CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost,
Integer.valueOf(canalPort)), "example", "", ""); int batchSize = 1000; try { //
连接 connector.connect(); // 监听表 connector.subscribe(table);
connector.rollback(); // 一直循环监听 while (true) { // 获取指定数量的数据 Message message =
connector.getWithoutAck(batchSize); long batchId = message.getId(); if(-1 !=
batchId && 0 != message.getEntries().size()) {
printEntry(message.getEntries()); } // 提交确认 connector.ack(batchId); } } finally
{ connector.disconnect(); } } /** * 打印具体变化 * @param entrys */ private void
printEntry(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry :
entrys) { if
(CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType()) ||
CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) { continue; }
CanalEntry.RowChange rowChage = null; try { rowChage =
CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error ,
data:" + entry.toString(), e); } CanalEntry.EventType eventType =
rowChage.getEventType(); System.out.println(String.format("================>
binlog[%s:%s] , 数据库:%s,表名%s , 类型: %s", entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData
: rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList()); } else if (eventType ==
CanalEntry.EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); }
else { System.out.println("-------修改之前");
printColumn(rowData.getBeforeColumnsList()); System.out.println("-------修改之后");
printColumn(rowData.getAfterColumnsList()); } } } } private void
printColumn(List<CanalEntry.Column> columns) { Map<String,Object> aaMap = new
HashMap<>(); for (CanalEntry.Column column : columns) {
aaMap.put(column.getName(), column.getValue()); } System.out.println( new
JSONObject(aaMap).toJSONString()); } }
JeesunSearchApplicationL
package com.jeesun.gc.jeesunsearch; import
com.jeesun.gc.jeesunsearch.canal.CanalThread; import
org.springframework.boot.SpringApplication; import
org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class JeesunSearchApplication { public static
void main(String[] args) { SpringApplication.run(JeesunSearchApplication.class,
args); new Thread(new CanalThread()).start(); } }