First Canal
Canal 是阿里巴巴开源的 MySQL binlog 增量订阅 & 消费组件。
官方仓库:https://github.com/alibaba/canal/
以下是根据官方的 Wiki 文档,在本地(Windows)搭建 Canal Server 的过程。
拉取 Canal Server 镜像
bash
docker pull canal/canal-server:v1.1.7
拉取不下来的可以参考 这篇博客,也可以直接使用下面的私有镜像。
bash
docker pull registry.cn-hangzhou.aliyuncs.com/pusher/canal-server:v1.1.7
docker tag registry.cn-hangzhou.aliyuncs.com/pusher/canal-server:v1.1.7 canal/canal-server:v1.1.7
配置 MySQL
NOTE
这里的 MySQL 8.0 是部署在 K8s 上的。清单见后面的 附 1. MySQL 8.0 的部署清单,仅供参考。
新增 binlog 配置
我这里是修改 mysql-8-config 中的 my.cnf 配置项,然后重启 MySQL 服务即可。
ini
[mysqld]
log-bin=mysql-bin #添加这一行就 ok
binlog-format=ROW #选择 row 模式
server_id=1 #配置 mysql replaction 需要定义,不能和 canal 的 slaveId 重复
创建 canal 用户
sql
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
启动 Canal Server
官方文档 里提供了启动的脚本,可以通过如下命令下载并启动。
下载运行脚本
bash
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh
构建一个 destination name 为 test 的队列
bash
sh run.sh -e canal.auto.scan=false \
-e canal.destinations=test \
-e canal.instance.master.address=127.0.0.1:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
Windows 下启动镜像
我这里由于是 Windows,只好手动拼 docker run
的参数。
bash
docker run -d --privileged=true -it -h 127.0.0.1 -e canal.instance.mysql.slaveId=2 -e canal.auto.scan=false -e canal.destinations=test -e canal.instance.master.address=127.0.0.1:3306 -e canal.instance.dbUsername=canal -e canal.instance.dbPassword=canal -e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false --name=canal-server-test-8 -p 11110:11110 -p 11111:11111 -p 11112:11112 -p 9100:9100 -m 4096m canal/canal-server:v1.1.7
我这边启动后日志如下:
txt
DOCKER_DEPLOY_TYPE=VM
==> INIT /alidata/init/02init-sshd.sh
==> EXIT CODE: 0
==> INIT /alidata/init/fix-hosts.py
==> EXIT CODE: 0
==> INIT DEFAULT
Failed to get D-Bus connection: Operation not permitted
Failed to get D-Bus connection: Operation not permitted
==> INIT DONE
==> RUN /home/admin/app.sh
==> START ...
start canal ...
start canal successful
==> START SUCCESSFUL ...
出现了两个 Failed to get D-Bus connection: Operation not permitted 的错误,但貌似不影响使用。
运行 Canal Client
这里是直接运行了官方示例 SimpleCanalClientTest
。由于上面指定了 canal.destinations
为 test,所以示例里的 destination
也要修改为一致的名字。
java
package com.alibaba.otter.canal.example;
import java.net.InetSocketAddress;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
/**
* 单机模式的测试例子
*
* @author jianghang 2013-4-15 下午 04:19:20
* @version 1.0.4
*/
public class SimpleCanalClientTest extends AbstractCanalClientTest {
public SimpleCanalClientTest(String destination){
super(destination);
}
public static void main(String args[]) {
// 根据 ip,直接创建链接,无 HA 的功能
String destination = "test";
String ip = AddressUtils.getHostIp();
CanalConnector connector = CanalConnectors
.newSingleConnector(new InetSocketAddress(ip, 11111), destination, "canal", "canal");
final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);
clientTest.setConnector(connector);
clientTest.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
logger.info("## stop the canal client");
clientTest.stop();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal:", e);
} finally {
logger.info("## canal client is down.");
}
}));
}
}
启动后在 DB 中执行了如下几个操作:
- 创建数据库
canal_test
- 创建表
t_user
- 插入数据
- 修改数据
Canal Client 打印的日志如下:
txt
****************************************************
* Batch Id: [1] ,count : [1] , memsize : [181] , Time : 2024-09-25 11:00:05
* Start : [mysql-bin.000001:979:1727233201000(2024-09-25 11:00:01)]
* End : [mysql-bin.000001:979:1727233201000(2024-09-25 11:00:01)]
****************************************************
----------------> binlog[mysql-bin.000001:979] , name[,] , eventType : QUERY , executeTime : 1727233201000(2024-09-25 11:00:01) , gtid : () , delay : 4793 ms
ddl : true , sql ----> CREATE DATABASE `canal_test` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci'
****************************************************
* Batch Id: [2] ,count : [1] , memsize : [307] , Time : 2024-09-25 11:01:33
* Start : [mysql-bin.000001:1239:1727233293000(2024-09-25 11:01:33)]
* End : [mysql-bin.000001:1239:1727233293000(2024-09-25 11:01:33)]
****************************************************
----------------> binlog[mysql-bin.000001:1239] , name[canal_test,t_user] , eventType : CREATE , executeTime : 1727233293000(2024-09-25 11:01:33) , gtid : () , delay : 823 ms
ddl : true , sql ----> CREATE TABLE `canal_test`.`t_user` (
`id` bigint UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'ID',
`name` varchar(80) NULL COMMENT '姓名',
PRIMARY KEY (`id`)
) ENGINE = InnoDB COMMENT = '用户表'
****************************************************
* Batch Id: [3] ,count : [3] , memsize : [165] , Time : 2024-09-25 11:01:45
* Start : [mysql-bin.000001:1625:1727233305000(2024-09-25 11:01:45)]
* End : [mysql-bin.000001:1823:1727233305000(2024-09-25 11:01:45)]
****************************************************
================> binlog[mysql-bin.000001:1625] , executeTime : 1727233305000(2024-09-25 11:01:45) , gtid : () , delay : 711ms
BEGIN ----> Thread id: 128
----------------> binlog[mysql-bin.000001:1770] , name[canal_test,t_user] , eventType : INSERT , executeTime : 1727233305000(2024-09-25 11:01:45) , gtid : () , delay : 718 ms
id : 1 type=bigint unsigned update=true
name : 用户 1 type=varchar(80) update=true
----------------
END ----> transaction id: 688
================> binlog[mysql-bin.000001:1823] , executeTime : 1727233305000(2024-09-25 11:01:45) , gtid : () , delay : 729ms
****************************************************
* Batch Id: [4] ,count : [3] , memsize : [165] , Time : 2024-09-25 11:02:20
* Start : [mysql-bin.000001:1933:1727233340000(2024-09-25 11:02:20)]
* End : [mysql-bin.000001:2131:1727233340000(2024-09-25 11:02:20)]
****************************************************
================> binlog[mysql-bin.000001:1933] , executeTime : 1727233340000(2024-09-25 11:02:20) , gtid : () , delay : 814ms
BEGIN ----> Thread id: 128
----------------> binlog[mysql-bin.000001:2078] , name[canal_test,t_user] , eventType : INSERT , executeTime : 1727233340000(2024-09-25 11:02:20) , gtid : () , delay : 814 ms
id : 2 type=bigint unsigned update=true
name : 用户 2 type=varchar(80) update=true
----------------
END ----> transaction id: 707
================> binlog[mysql-bin.000001:2131] , executeTime : 1727233340000(2024-09-25 11:02:20) , gtid : () , delay : 814ms
****************************************************
* Batch Id: [5] ,count : [3] , memsize : [195] , Time : 2024-09-25 11:02:23
* Start : [mysql-bin.000001:2241:1727233343000(2024-09-25 11:02:23)]
* End : [mysql-bin.000001:2469:1727233343000(2024-09-25 11:02:23)]
****************************************************
================> binlog[mysql-bin.000001:2241] , executeTime : 1727233343000(2024-09-25 11:02:23) , gtid : () , delay : 640ms
BEGIN ----> Thread id: 128
----------------> binlog[mysql-bin.000001:2395] , name[canal_test,t_user] , eventType : UPDATE , executeTime : 1727233343000(2024-09-25 11:02:23) , gtid : () , delay : 640 ms
id : 1 type=bigint unsigned
name : 用户 1-1 type=varchar(80) update=true
----------------
END ----> transaction id: 710
================> binlog[mysql-bin.000001:2469] , executeTime : 1727233343000(2024-09-25 11:02:23) , gtid : () , delay : 641ms
****************************************************
* Batch Id: [6] ,count : [3] , memsize : [165] , Time : 2024-09-25 12:00:18
* Start : [mysql-bin.000001:2579:1727236818000(2024-09-25 12:00:18)]
* End : [mysql-bin.000001:2777:1727236818000(2024-09-25 12:00:18)]
****************************************************
================> binlog[mysql-bin.000001:2579] , executeTime : 1727236818000(2024-09-25 12:00:18) , gtid : () , delay : 302ms
BEGIN ----> Thread id: 128
----------------> binlog[mysql-bin.000001:2724] , name[canal_test,t_user] , eventType : DELETE , executeTime : 1727236818000(2024-09-25 12:00:18) , gtid : () , delay : 303 ms
id : 2 type=bigint unsigned
name : 用户 2 type=varchar(80)
----------------
END ----> transaction id: 2105
================> binlog[mysql-bin.000001:2777] , executeTime : 1727236818000(2024-09-25 12:00:18) , gtid : () , delay : 305ms
附 1. MySQL 8.0 的部署清单
点击查看清单详细内容
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
deployment.kubernetes.io/revision: "7"
creationTimestamp: "2024-04-09T09:37:59Z"
generation: 9
labels:
app: mysql-8
release: mysql-8
name: mysql-8
namespace: common
spec:
progressDeadlineSeconds: 600
replicas: 1
revisionHistoryLimit: 2
selector:
matchLabels:
app: mysql-8
release: mysql-8
strategy:
type: Recreate
template:
metadata:
labels:
app: mysql-8
release: mysql-8
spec:
containers:
- env:
- name: TZ
value: Asia/Shanghai
- name: MYSQL_ROOT_PASSWORD
valueFrom:
secretKeyRef:
key: mysql-root-password
name: mysql-8-secret
image: mysql:8.0.31
imagePullPolicy: IfNotPresent
livenessProbe:
exec:
command:
- sh
- -c
- mysqladmin ping -u root -p${MYSQL_ROOT_PASSWORD}
failureThreshold: 3
initialDelaySeconds: 30
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 5
name: mysql-8
ports:
- containerPort: 3306
name: mysql
protocol: TCP
readinessProbe:
exec:
command:
- sh
- -c
- mysqladmin ping -u root -p${MYSQL_ROOT_PASSWORD}
failureThreshold: 3
initialDelaySeconds: 5
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
resources:
requests:
cpu: 100m
memory: 256Mi
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /var/lib/mysql
name: data
- mountPath: /etc/mysql/conf.d/
name: my-conf-volume
readOnly: true
dnsPolicy: ClusterFirst
initContainers:
- command:
- rm
- -fr
- /var/lib/mysql/lost+found
image: busybox:1.32
imagePullPolicy: IfNotPresent
name: remove-lost-found
resources:
requests:
cpu: 10m
memory: 10Mi
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /var/lib/mysql
name: data
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
serviceAccount: default
serviceAccountName: default
terminationGracePeriodSeconds: 30
volumes:
- name: data
persistentVolumeClaim:
claimName: pvc-common-mysql-8
- configMap:
defaultMode: 420
items:
- key: my.cnf
path: my.cnf
name: mysql-8-config
name: my-conf-volume
附 2. Canal Client example 代码
SimpleCanalClientTest.java
点击查看代码
javapackage com.alibaba.otter.canal.example; import java.net.InetSocketAddress; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; /** * 单机模式的测试例子 * * @author jianghang 2013-4-15 下午04:19:20 * @version 1.0.4 */ public class SimpleCanalClientTest extends AbstractCanalClientTest { public SimpleCanalClientTest(String destination){ super(destination); } public static void main(String args[]) { // 根据ip,直接创建链接,无HA的功能 String destination = "test"; String ip = AddressUtils.getHostIp(); CanalConnector connector = CanalConnectors .newSingleConnector(new InetSocketAddress(ip, 11111), destination, "canal", "canal"); final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination); clientTest.setConnector(connector); clientTest.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { logger.info("## stop the canal client"); clientTest.stop(); } catch (Throwable e) { logger.warn("##something goes wrong when stopping canal:", e); } finally { logger.info("## canal client is down."); } })); } }
AbstractCanalClientTest.java
点击查看代码
javapackage com.alibaba.otter.canal.example; import org.slf4j.MDC; import org.springframework.util.Assert; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.Message; /** * 测试基类 * * @author jianghang 2013-4-15 下午04:17:12 * @version 1.0.4 */ public class AbstractCanalClientTest extends BaseCanalClientTest { public AbstractCanalClientTest(String destination){ this(destination, null); } public AbstractCanalClientTest(String destination, CanalConnector connector){ this.destination = destination; this.connector = connector; } protected void start() { Assert.notNull(connector, "connector is null"); thread = new Thread(this::process); thread.setUncaughtExceptionHandler(handler); running = true; thread.start(); } protected void stop() { if (!running) { return; } running = false; if (thread != null) { try { thread.join(); } catch (InterruptedException e) { // ignore } } MDC.remove("destination"); } protected void process() { int batchSize = 5 * 1024; while (running) { try { MDC.put("destination", destination); connector.connect(); connector.subscribe(); while (running) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // } } else { printSummary(message, batchId, size); printEntry(message.getEntries()); } if (batchId != -1) { connector.ack(batchId); // 提交确认 } } } catch (Throwable e) { logger.error("process error!", e); try { Thread.sleep(1000L); } catch (InterruptedException e1) { // ignore } connector.rollback(); // 处理失败, 回滚数据 } finally { connector.disconnect(); MDC.remove("destination"); } } } }
BaseCanalClientTest.java
点击查看代码
javapackage com.alibaba.otter.canal.example; import java.io.UnsupportedEncodingException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.SystemUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.google.protobuf.InvalidProtocolBufferException; public class BaseCanalClientTest { protected final static Logger logger = LoggerFactory .getLogger(AbstractCanalClientTest.class); protected static final String SEP = SystemUtils.LINE_SEPARATOR; protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; protected volatile boolean running = false; protected Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error", e); protected Thread thread = null; protected CanalConnector connector; protected static String context_format = null; protected static String row_format = null; protected static String transaction_format = null; protected String destination; static { context_format = SEP + "****************************************************" + SEP; context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP; context_format += "* Start : [{}] " + SEP; context_format += "* End : [{}] " + SEP; context_format += "****************************************************" + SEP; row_format = SEP + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , gtid : ({}) , delay : {} ms" + SEP; transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms" + SEP; } protected void printSummary(Message message, long batchId, int size) { long memsize = 0; for (Entry entry : message.getEntries()) { memsize += entry.getHeader().getEventLength(); } String startPosition = null; String endPosition = null; if (!CollectionUtils.isEmpty(message.getEntries())) { startPosition = buildPositionForDump(message.getEntries().get(0)); endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1)); } SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT); logger.info(context_format, new Object[] { batchId, size, memsize, format.format(new Date()), startPosition, endPosition }); } protected String buildPositionForDump(Entry entry) { long time = entry.getHeader().getExecuteTime(); Date date = new Date(time); SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT); String position = entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":" + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")"; if (StringUtils.isNotEmpty(entry.getHeader().getGtid())) { position += " gtid(" + entry.getHeader().getGtid() + ")"; } return position; } protected void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { long executeTime = entry.getHeader().getExecuteTime(); long delayTime = new Date().getTime() - executeTime; Date date = new Date(entry.getHeader().getExecuteTime()); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) { TransactionBegin begin = null; try { begin = TransactionBegin.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } // 打印事务头信息,执行的线程id,事务耗时 logger.info(transaction_format, new Object[] { entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime) }); logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId()); printXAInfo(begin.getPropsList()); } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) { TransactionEnd end = null; try { end = TransactionEnd.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } // 打印事务提交信息,事务id logger.info("----------------\n"); logger.info(" END ----> transaction id: {}", end.getTransactionId()); printXAInfo(end.getPropsList()); logger.info(transaction_format, new Object[] { entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime) }); } continue; } if (entry.getEntryType() == EntryType.ROWDATA) { RowChange rowChange = null; try { rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } EventType eventType = rowChange.getEventType(); logger.info(row_format, new Object[] { entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime) }); if (eventType == EventType.QUERY || rowChange.getIsDdl()) { logger.info("ddl : " + rowChange.getIsDdl() + " , sql ----> " + rowChange.getSql() + SEP); continue; } printXAInfo(rowChange.getPropsList()); for (RowData rowData : rowChange.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { printColumn(rowData.getAfterColumnsList()); } } } } } protected void printColumn(List<Column> columns) { for (Column column : columns) { StringBuilder builder = new StringBuilder(); try { if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB") || StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) { // get value bytes builder.append( column.getName() + " : " + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8")); } else { builder.append(column.getName() + " : " + column.getValue()); } } catch (UnsupportedEncodingException e) { } builder.append(" type=" + column.getMysqlType()); if (column.getUpdated()) { builder.append(" update=" + column.getUpdated()); } builder.append(SEP); logger.info(builder.toString()); } } protected void printXAInfo(List<Pair> pairs) { if (pairs == null) { return; } String xaType = null; String xaXid = null; for (Pair pair : pairs) { String key = pair.getKey(); if (StringUtils.endsWithIgnoreCase(key, "XA_TYPE")) { xaType = pair.getValue(); } else if (StringUtils.endsWithIgnoreCase(key, "XA_XID")) { xaXid = pair.getValue(); } } if (xaType != null && xaXid != null) { logger.info(" ------> " + xaType + " " + xaXid); } } public void setConnector(CanalConnector connector) { this.connector = connector; } /** * 获取当前Entry的 GTID信息示例 * * @param header * @return */ public static String getCurrentGtid(CanalEntry.Header header) { List<CanalEntry.Pair> props = header.getPropsList(); if (props != null && props.size() > 0) { for (CanalEntry.Pair pair : props) { if ("curtGtid".equals(pair.getKey())) { return pair.getValue(); } } } return ""; } /** * 获取当前Entry的 GTID Sequence No信息示例 * * @param header * @return */ public static String getCurrentGtidSn(CanalEntry.Header header) { List<CanalEntry.Pair> props = header.getPropsList(); if (props != null && props.size() > 0) { for (CanalEntry.Pair pair : props) { if ("curtGtidSn".equals(pair.getKey())) { return pair.getValue(); } } } return ""; } /** * 获取当前Entry的 GTID Last Committed信息示例 * * @param header * @return */ public static String getCurrentGtidLct(CanalEntry.Header header) { List<CanalEntry.Pair> props = header.getPropsList(); if (props != null && props.size() > 0) { for (CanalEntry.Pair pair : props) { if ("curtGtidLct".equals(pair.getKey())) { return pair.getValue(); } } } return ""; } }