notes : First, the database needs to turn on the log mode
[mysqld] log-bin=/var/lib/mysql/mysql-bin # open binlog binlog-format=ROW # choice
ROW pattern server_id=1 # to configure MySQL replaction Need definition , Don't talk to canal of slaveId repeat
==============================================================================================================================
docker run -p 11111:11111 --name canal -id canal/canal-server
notes : When out of memory canal Will exit automatically
docker cp canal:/home/admin/canal-server/conf/ /data/canal/ docker run -p
11111:11111 -v /data/canal/conf:/home/admin/canal-server/conf --name canal -d
canal/canal-server
Modify profile :
/data/canal/conf/example/instance.properties
Modify four 1. modify id canal.instance.mysql.slaveId=10010 2. Configure database connection address
canal.instance.master.address=127.0.0.1:3306 3. Configure connection database name
canal.instance.dbUsername=root 4. Configure connection database password canal.instance.dbUsername=root12
After configuration, restart the container
==============================================================================================================================
pom:
<dependency> <groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency>
CanalThread:
package com.jeesun.gc.jeesunsearch.canal; import
com.alibaba.fastjson.JSONObject; import
com.alibaba.otter.canal.client.CanalConnector; import
com.alibaba.otter.canal.client.CanalConnectors; import
com.alibaba.otter.canal.protocol.CanalEntry; import
com.alibaba.otter.canal.protocol.Message; import
org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory;
import java.net.InetSocketAddress; import java.util.HashMap; import
java.util.List; import java.util.Map; /** * @author javachen * @date 2020/12/24
11:50 * @desc Created with IntelliJ IDEA. */ public class CanalThread
implements Runnable { Log log = LogFactory.getLog(CanalThread.class); @Override
public void run() { listener("101.201.150.23", "11111",
"mytest.goods,mytest.goods_copy"); } public void listener(String canalHost,
String canalPort, String table) { // create link CanalConnector connector =
CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost,
Integer.valueOf(canalPort)), "example", "", ""); int batchSize = 1000; try { //
connect connector.connect(); // Monitor table connector.subscribe(table);
connector.rollback(); // Loop listening all the time while (true) { // Get the specified amount of data Message message =
connector.getWithoutAck(batchSize); long batchId = message.getId(); if(-1 !=
batchId && 0 != message.getEntries().size()) {
printEntry(message.getEntries()); } // Submit for confirmation connector.ack(batchId); } } finally
{ connector.disconnect(); } } /** * Print specific changes * @param entrys */ private void
printEntry(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry :
entrys) { if
(CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType()) ||
CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) { continue; }
CanalEntry.RowChange rowChage = null; try { rowChage =
CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error ,
data:" + entry.toString(), e); } CanalEntry.EventType eventType =
rowChage.getEventType(); System.out.println(String.format("================>
binlog[%s:%s] , database :%s, Table name %s , type : %s", entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData
: rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList()); } else if (eventType ==
CanalEntry.EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); }
else { System.out.println("------- Before modification ");
printColumn(rowData.getBeforeColumnsList()); System.out.println("------- After modification ");
printColumn(rowData.getAfterColumnsList()); } } } } private void
printColumn(List<CanalEntry.Column> columns) { Map<String,Object> aaMap = new
HashMap<>(); for (CanalEntry.Column column : columns) {
aaMap.put(column.getName(), column.getValue()); } System.out.println( new
JSONObject(aaMap).toJSONString()); } }
JeesunSearchApplicationL
package com.jeesun.gc.jeesunsearch; import
com.jeesun.gc.jeesunsearch.canal.CanalThread; import
org.springframework.boot.SpringApplication; import
org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class JeesunSearchApplication { public static
void main(String[] args) { SpringApplication.run(JeesunSearchApplication.class,
args); new Thread(new CanalThread()).start(); } }
Technology