Skip to content

First Canal

Canal 是阿里巴巴开源的 MySQL binlog 增量订阅 & 消费组件。

官方仓库:https://github.com/alibaba/canal/

以下是根据官方的 Wiki 文档,在本地(Windows)搭建 Canal Server 的过程。

拉取 Canal Server 镜像

bash
docker pull canal/canal-server:v1.1.7

拉取不下来的可以参考 这篇博客,也可以直接使用下面的私有镜像。

bash
docker pull registry.cn-hangzhou.aliyuncs.com/pusher/canal-server:v1.1.7
docker tag registry.cn-hangzhou.aliyuncs.com/pusher/canal-server:v1.1.7 canal/canal-server:v1.1.7

配置 MySQL

NOTE

这里的 MySQL 8.0 是部署在 K8s 上的。清单见后面的 附 1. MySQL 8.0 的部署清单,仅供参考。

新增 binlog 配置

我这里是修改 mysql-8-config 中的 my.cnf 配置项,然后重启 MySQL 服务即可。

ini
[mysqld]  
log-bin=mysql-bin #添加这一行就 ok  
binlog-format=ROW #选择 row 模式  
server_id=1 #配置 mysql replaction 需要定义,不能和 canal 的 slaveId 重复

创建 canal 用户

sql
CREATE USER canal IDENTIFIED BY 'canal';    
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';  
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;  
FLUSH PRIVILEGES;

启动 Canal Server

官方文档 里提供了启动的脚本,可以通过如下命令下载并启动。

下载运行脚本

bash
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh

构建一个 destination name 为 test 的队列

bash
sh run.sh -e canal.auto.scan=false \
          -e canal.destinations=test \
          -e canal.instance.master.address=127.0.0.1:3306  \
          -e canal.instance.dbUsername=canal  \
          -e canal.instance.dbPassword=canal  \
          -e canal.instance.connectionCharset=UTF-8 \
          -e canal.instance.tsdb.enable=true \
          -e canal.instance.gtidon=false  \

Windows 下启动镜像

我这里由于是 Windows,只好手动拼 docker run 的参数。

bash
docker run -d --privileged=true -it -h 127.0.0.1 -e canal.instance.mysql.slaveId=2 -e canal.auto.scan=false -e canal.destinations=test -e canal.instance.master.address=127.0.0.1:3306 -e canal.instance.dbUsername=canal -e canal.instance.dbPassword=canal -e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false --name=canal-server-test-8 -p 11110:11110 -p 11111:11111 -p 11112:11112 -p 9100:9100 -m 4096m canal/canal-server:v1.1.7

我这边启动后日志如下:

txt
DOCKER_DEPLOY_TYPE=VM
==> INIT /alidata/init/02init-sshd.sh
==> EXIT CODE: 0
==> INIT /alidata/init/fix-hosts.py
==> EXIT CODE: 0
==> INIT DEFAULT
Failed to get D-Bus connection: Operation not permitted
Failed to get D-Bus connection: Operation not permitted
==> INIT DONE
==> RUN /home/admin/app.sh
==> START ...
start canal ...
start canal successful
==> START SUCCESSFUL ...

出现了两个 Failed to get D-Bus connection: Operation not permitted 的错误,但貌似不影响使用。

运行 Canal Client

这里是直接运行了官方示例 SimpleCanalClientTest。由于上面指定了 canal.destinationstest,所以示例里的 destination 也要修改为一致的名字。

java
package com.alibaba.otter.canal.example;

import java.net.InetSocketAddress;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;

/**
 * 单机模式的测试例子
 * 
 * @author jianghang 2013-4-15 下午 04:19:20
 * @version 1.0.4
 */
public class SimpleCanalClientTest extends AbstractCanalClientTest {

    public SimpleCanalClientTest(String destination){
        super(destination);
    }

    public static void main(String args[]) {
        // 根据 ip,直接创建链接,无 HA 的功能
        String destination = "test";
        String ip = AddressUtils.getHostIp();
        CanalConnector connector = CanalConnectors
            .newSingleConnector(new InetSocketAddress(ip, 11111), destination, "canal", "canal");

        final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);
        clientTest.setConnector(connector);
        clientTest.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                logger.info("## stop the canal client");
                clientTest.stop();
            } catch (Throwable e) {
                logger.warn("##something goes wrong when stopping canal:", e);
            } finally {
                logger.info("## canal client is down.");
            }
        }));
    }

}

启动后在 DB 中执行了如下几个操作:

  • 创建数据库 canal_test
  • 创建表 t_user
  • 插入数据
  • 修改数据

Canal Client 打印的日志如下:

txt
****************************************************
* Batch Id: [1] ,count : [1] , memsize : [181] , Time : 2024-09-25 11:00:05
* Start : [mysql-bin.000001:979:1727233201000(2024-09-25 11:00:01)] 
* End : [mysql-bin.000001:979:1727233201000(2024-09-25 11:00:01)] 
****************************************************

----------------> binlog[mysql-bin.000001:979] , name[,] , eventType : QUERY , executeTime : 1727233201000(2024-09-25 11:00:01) , gtid : () , delay : 4793 ms
ddl : true ,  sql ----> CREATE DATABASE `canal_test` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci'

****************************************************
* Batch Id: [2] ,count : [1] , memsize : [307] , Time : 2024-09-25 11:01:33
* Start : [mysql-bin.000001:1239:1727233293000(2024-09-25 11:01:33)] 
* End : [mysql-bin.000001:1239:1727233293000(2024-09-25 11:01:33)] 
****************************************************

----------------> binlog[mysql-bin.000001:1239] , name[canal_test,t_user] , eventType : CREATE , executeTime : 1727233293000(2024-09-25 11:01:33) , gtid : () , delay : 823 ms
ddl : true ,  sql ----> CREATE TABLE `canal_test`.`t_user`  (
  `id` bigint UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `name` varchar(80) NULL COMMENT '姓名',
  PRIMARY KEY (`id`)
) ENGINE = InnoDB COMMENT = '用户表'

****************************************************
* Batch Id: [3] ,count : [3] , memsize : [165] , Time : 2024-09-25 11:01:45
* Start : [mysql-bin.000001:1625:1727233305000(2024-09-25 11:01:45)] 
* End : [mysql-bin.000001:1823:1727233305000(2024-09-25 11:01:45)] 
****************************************************

================> binlog[mysql-bin.000001:1625] , executeTime : 1727233305000(2024-09-25 11:01:45) , gtid : () , delay : 711ms
 BEGIN ----> Thread id: 128
----------------> binlog[mysql-bin.000001:1770] , name[canal_test,t_user] , eventType : INSERT , executeTime : 1727233305000(2024-09-25 11:01:45) , gtid : () , delay : 718 ms
id : 1    type=bigint unsigned    update=true
name : 用户 1    type=varchar(80)    update=true
----------------
 END ----> transaction id: 688
================> binlog[mysql-bin.000001:1823] , executeTime : 1727233305000(2024-09-25 11:01:45) , gtid : () , delay : 729ms

****************************************************
* Batch Id: [4] ,count : [3] , memsize : [165] , Time : 2024-09-25 11:02:20
* Start : [mysql-bin.000001:1933:1727233340000(2024-09-25 11:02:20)] 
* End : [mysql-bin.000001:2131:1727233340000(2024-09-25 11:02:20)] 
****************************************************

================> binlog[mysql-bin.000001:1933] , executeTime : 1727233340000(2024-09-25 11:02:20) , gtid : () , delay : 814ms
 BEGIN ----> Thread id: 128
----------------> binlog[mysql-bin.000001:2078] , name[canal_test,t_user] , eventType : INSERT , executeTime : 1727233340000(2024-09-25 11:02:20) , gtid : () , delay : 814 ms
id : 2    type=bigint unsigned    update=true
name : 用户 2    type=varchar(80)    update=true
----------------
 END ----> transaction id: 707
================> binlog[mysql-bin.000001:2131] , executeTime : 1727233340000(2024-09-25 11:02:20) , gtid : () , delay : 814ms

****************************************************
* Batch Id: [5] ,count : [3] , memsize : [195] , Time : 2024-09-25 11:02:23
* Start : [mysql-bin.000001:2241:1727233343000(2024-09-25 11:02:23)] 
* End : [mysql-bin.000001:2469:1727233343000(2024-09-25 11:02:23)] 
****************************************************

================> binlog[mysql-bin.000001:2241] , executeTime : 1727233343000(2024-09-25 11:02:23) , gtid : () , delay : 640ms
 BEGIN ----> Thread id: 128
----------------> binlog[mysql-bin.000001:2395] , name[canal_test,t_user] , eventType : UPDATE , executeTime : 1727233343000(2024-09-25 11:02:23) , gtid : () , delay : 640 ms
id : 1    type=bigint unsigned
name : 用户 1-1    type=varchar(80)    update=true
----------------
 END ----> transaction id: 710
================> binlog[mysql-bin.000001:2469] , executeTime : 1727233343000(2024-09-25 11:02:23) , gtid : () , delay : 641ms

****************************************************
* Batch Id: [6] ,count : [3] , memsize : [165] , Time : 2024-09-25 12:00:18
* Start : [mysql-bin.000001:2579:1727236818000(2024-09-25 12:00:18)] 
* End : [mysql-bin.000001:2777:1727236818000(2024-09-25 12:00:18)] 
****************************************************

================> binlog[mysql-bin.000001:2579] , executeTime : 1727236818000(2024-09-25 12:00:18) , gtid : () , delay : 302ms
 BEGIN ----> Thread id: 128
----------------> binlog[mysql-bin.000001:2724] , name[canal_test,t_user] , eventType : DELETE , executeTime : 1727236818000(2024-09-25 12:00:18) , gtid : () , delay : 303 ms
id : 2    type=bigint unsigned
name : 用户 2    type=varchar(80)
----------------
 END ----> transaction id: 2105
================> binlog[mysql-bin.000001:2777] , executeTime : 1727236818000(2024-09-25 12:00:18) , gtid : () , delay : 305ms

附 1. MySQL 8.0 的部署清单

点击查看清单详细内容
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    deployment.kubernetes.io/revision: "7"
  creationTimestamp: "2024-04-09T09:37:59Z"
  generation: 9
  labels:
    app: mysql-8
    release: mysql-8
  name: mysql-8
  namespace: common
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 2
  selector:
    matchLabels:
      app: mysql-8
      release: mysql-8
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: mysql-8
        release: mysql-8
    spec:
      containers:
      - env:
        - name: TZ
          value: Asia/Shanghai
        - name: MYSQL_ROOT_PASSWORD
          valueFrom:
            secretKeyRef:
              key: mysql-root-password
              name: mysql-8-secret
        image: mysql:8.0.31
        imagePullPolicy: IfNotPresent
        livenessProbe:
          exec:
            command:
            - sh
            - -c
            - mysqladmin ping -u root -p${MYSQL_ROOT_PASSWORD}
          failureThreshold: 3
          initialDelaySeconds: 30
          periodSeconds: 10
          successThreshold: 1
          timeoutSeconds: 5
        name: mysql-8
        ports:
        - containerPort: 3306
          name: mysql
          protocol: TCP
        readinessProbe:
          exec:
            command:
            - sh
            - -c
            - mysqladmin ping -u root -p${MYSQL_ROOT_PASSWORD}
          failureThreshold: 3
          initialDelaySeconds: 5
          periodSeconds: 10
          successThreshold: 1
          timeoutSeconds: 1
        resources:
          requests:
            cpu: 100m
            memory: 256Mi
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /var/lib/mysql
          name: data
        - mountPath: /etc/mysql/conf.d/
          name: my-conf-volume
          readOnly: true
      dnsPolicy: ClusterFirst
      initContainers:
      - command:
        - rm
        - -fr
        - /var/lib/mysql/lost+found
        image: busybox:1.32
        imagePullPolicy: IfNotPresent
        name: remove-lost-found
        resources:
          requests:
            cpu: 10m
            memory: 10Mi
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /var/lib/mysql
          name: data
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: default
      serviceAccountName: default
      terminationGracePeriodSeconds: 30
      volumes:
      - name: data
        persistentVolumeClaim:
          claimName: pvc-common-mysql-8
      - configMap:
          defaultMode: 420
          items:
          - key: my.cnf
            path: my.cnf
          name: mysql-8-config
        name: my-conf-volume

附 2. Canal Client example 代码

  1. SimpleCanalClientTest.java

    点击查看代码
    java
    package com.alibaba.otter.canal.example;
    
    import java.net.InetSocketAddress;
    
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.common.utils.AddressUtils;
    
    /**
     * 单机模式的测试例子
     * 
     * @author jianghang 2013-4-15 下午04:19:20
     * @version 1.0.4
     */
    public class SimpleCanalClientTest extends AbstractCanalClientTest {
    
        public SimpleCanalClientTest(String destination){
            super(destination);
        }
    
        public static void main(String args[]) {
            // 根据ip,直接创建链接,无HA的功能
            String destination = "test";
            String ip = AddressUtils.getHostIp();
            CanalConnector connector = CanalConnectors
                .newSingleConnector(new InetSocketAddress(ip, 11111), destination, "canal", "canal");
    
            final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);
            clientTest.setConnector(connector);
            clientTest.start();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                try {
                    logger.info("## stop the canal client");
                    clientTest.stop();
                } catch (Throwable e) {
                    logger.warn("##something goes wrong when stopping canal:", e);
                } finally {
                    logger.info("## canal client is down.");
                }
            }));
        }
    
    }
  2. AbstractCanalClientTest.java

    点击查看代码
    java
    package com.alibaba.otter.canal.example;
    
    import org.slf4j.MDC;
    import org.springframework.util.Assert;
    
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.protocol.Message;
    
    /**
     * 测试基类
     * 
     * @author jianghang 2013-4-15 下午04:17:12
     * @version 1.0.4
     */
    public class AbstractCanalClientTest extends BaseCanalClientTest {
    
        public AbstractCanalClientTest(String destination){
            this(destination, null);
        }
    
        public AbstractCanalClientTest(String destination, CanalConnector connector){
            this.destination = destination;
            this.connector = connector;
        }
    
        protected void start() {
            Assert.notNull(connector, "connector is null");
            thread = new Thread(this::process);
    
            thread.setUncaughtExceptionHandler(handler);
            running = true;
            thread.start();
        }
    
        protected void stop() {
            if (!running) {
                return;
            }
            running = false;
            if (thread != null) {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    // ignore
                }
            }
    
            MDC.remove("destination");
        }
    
        protected void process() {
            int batchSize = 5 * 1024;
            while (running) {
                try {
                    MDC.put("destination", destination);
                    connector.connect();
                    connector.subscribe();
                    while (running) {
                        Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                        long batchId = message.getId();
                        int size = message.getEntries().size();
                        if (batchId == -1 || size == 0) {
                            // try {
                            // Thread.sleep(1000);
                            // } catch (InterruptedException e) {
                            // }
                        } else {
                            printSummary(message, batchId, size);
                            printEntry(message.getEntries());
                        }
    
                        if (batchId != -1) {
                            connector.ack(batchId); // 提交确认
                        }
                    }
                } catch (Throwable e) {
                    logger.error("process error!", e);
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e1) {
                        // ignore
                    }
    
                    connector.rollback(); // 处理失败, 回滚数据
                } finally {
                    connector.disconnect();
                    MDC.remove("destination");
                }
            }
        }
    
    }
  3. BaseCanalClientTest.java

    点击查看代码
    java
    package com.alibaba.otter.canal.example;
    
    import java.io.UnsupportedEncodingException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.List;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.commons.lang.SystemUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.util.CollectionUtils;
    
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import com.alibaba.otter.canal.protocol.CanalEntry.*;
    import com.google.protobuf.InvalidProtocolBufferException;
    
    public class BaseCanalClientTest {
    
        protected final static Logger             logger             = LoggerFactory
            .getLogger(AbstractCanalClientTest.class);
        protected static final String             SEP                = SystemUtils.LINE_SEPARATOR;
        protected static final String             DATE_FORMAT        = "yyyy-MM-dd HH:mm:ss";
        protected volatile boolean                running            = false;
        protected Thread.UncaughtExceptionHandler handler            = (t, e) -> logger.error("parse events has an error",
            e);
        protected Thread                          thread             = null;
        protected CanalConnector                  connector;
        protected static String                   context_format     = null;
        protected static String                   row_format         = null;
        protected static String                   transaction_format = null;
        protected String                          destination;
    
        static {
            context_format = SEP + "****************************************************" + SEP;
            context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;
            context_format += "* Start : [{}] " + SEP;
            context_format += "* End : [{}] " + SEP;
            context_format += "****************************************************" + SEP;
    
            row_format = SEP
                        + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , gtid : ({}) , delay : {} ms"
                        + SEP;
    
            transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms"
                                + SEP;
    
        }
    
        protected void printSummary(Message message, long batchId, int size) {
            long memsize = 0;
            for (Entry entry : message.getEntries()) {
                memsize += entry.getHeader().getEventLength();
            }
    
            String startPosition = null;
            String endPosition = null;
            if (!CollectionUtils.isEmpty(message.getEntries())) {
                startPosition = buildPositionForDump(message.getEntries().get(0));
                endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
            }
    
            SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
            logger.info(context_format,
                new Object[] { batchId, size, memsize, format.format(new Date()), startPosition, endPosition });
        }
    
        protected String buildPositionForDump(Entry entry) {
            long time = entry.getHeader().getExecuteTime();
            Date date = new Date(time);
            SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
            String position = entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":"
                            + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";
            if (StringUtils.isNotEmpty(entry.getHeader().getGtid())) {
                position += " gtid(" + entry.getHeader().getGtid() + ")";
            }
            return position;
        }
    
        protected void printEntry(List<Entry> entrys) {
            for (Entry entry : entrys) {
                long executeTime = entry.getHeader().getExecuteTime();
                long delayTime = new Date().getTime() - executeTime;
                Date date = new Date(entry.getHeader().getExecuteTime());
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
                        TransactionBegin begin = null;
                        try {
                            begin = TransactionBegin.parseFrom(entry.getStoreValue());
                        } catch (InvalidProtocolBufferException e) {
                            throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                        }
                        // 打印事务头信息,执行的线程id,事务耗时
                        logger.info(transaction_format,
                            new Object[] { entry.getHeader().getLogfileName(),
                                        String.valueOf(entry.getHeader().getLogfileOffset()),
                                        String.valueOf(entry.getHeader().getExecuteTime()),
                                        simpleDateFormat.format(date), entry.getHeader().getGtid(),
                                        String.valueOf(delayTime) });
                        logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
                        printXAInfo(begin.getPropsList());
                    } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                        TransactionEnd end = null;
                        try {
                            end = TransactionEnd.parseFrom(entry.getStoreValue());
                        } catch (InvalidProtocolBufferException e) {
                            throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                        }
                        // 打印事务提交信息,事务id
                        logger.info("----------------\n");
                        logger.info(" END ----> transaction id: {}", end.getTransactionId());
                        printXAInfo(end.getPropsList());
                        logger.info(transaction_format,
                            new Object[] { entry.getHeader().getLogfileName(),
                                        String.valueOf(entry.getHeader().getLogfileOffset()),
                                        String.valueOf(entry.getHeader().getExecuteTime()),
                                        simpleDateFormat.format(date), entry.getHeader().getGtid(),
                                        String.valueOf(delayTime) });
                    }
    
                    continue;
                }
    
                if (entry.getEntryType() == EntryType.ROWDATA) {
                    RowChange rowChange = null;
                    try {
                        rowChange = RowChange.parseFrom(entry.getStoreValue());
                    } catch (Exception e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
    
                    EventType eventType = rowChange.getEventType();
    
                    logger.info(row_format,
                        new Object[] { entry.getHeader().getLogfileName(),
                                    String.valueOf(entry.getHeader().getLogfileOffset()),
                                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType,
                                    String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
                                    entry.getHeader().getGtid(), String.valueOf(delayTime) });
    
                    if (eventType == EventType.QUERY || rowChange.getIsDdl()) {
                        logger.info("ddl : " + rowChange.getIsDdl() + " ,  sql ----> " + rowChange.getSql() + SEP);
                        continue;
                    }
    
                    printXAInfo(rowChange.getPropsList());
                    for (RowData rowData : rowChange.getRowDatasList()) {
                        if (eventType == EventType.DELETE) {
                            printColumn(rowData.getBeforeColumnsList());
                        } else if (eventType == EventType.INSERT) {
                            printColumn(rowData.getAfterColumnsList());
                        } else {
                            printColumn(rowData.getAfterColumnsList());
                        }
                    }
                }
            }
        }
    
        protected void printColumn(List<Column> columns) {
            for (Column column : columns) {
                StringBuilder builder = new StringBuilder();
                try {
                    if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")
                        || StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {
                        // get value bytes
                        builder.append(
                            column.getName() + " : " + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));
                    } else {
                        builder.append(column.getName() + " : " + column.getValue());
                    }
                } catch (UnsupportedEncodingException e) {
                }
                builder.append("    type=" + column.getMysqlType());
                if (column.getUpdated()) {
                    builder.append("    update=" + column.getUpdated());
                }
                builder.append(SEP);
                logger.info(builder.toString());
            }
        }
    
        protected void printXAInfo(List<Pair> pairs) {
            if (pairs == null) {
                return;
            }
    
            String xaType = null;
            String xaXid = null;
            for (Pair pair : pairs) {
                String key = pair.getKey();
                if (StringUtils.endsWithIgnoreCase(key, "XA_TYPE")) {
                    xaType = pair.getValue();
                } else if (StringUtils.endsWithIgnoreCase(key, "XA_XID")) {
                    xaXid = pair.getValue();
                }
            }
    
            if (xaType != null && xaXid != null) {
                logger.info(" ------> " + xaType + " " + xaXid);
            }
        }
    
        public void setConnector(CanalConnector connector) {
            this.connector = connector;
        }
    
        /**
         * 获取当前Entry的 GTID信息示例
         *
         * @param header
         * @return
         */
        public static String getCurrentGtid(CanalEntry.Header header) {
            List<CanalEntry.Pair> props = header.getPropsList();
            if (props != null && props.size() > 0) {
                for (CanalEntry.Pair pair : props) {
                    if ("curtGtid".equals(pair.getKey())) {
                        return pair.getValue();
                    }
                }
            }
            return "";
        }
    
        /**
         * 获取当前Entry的 GTID Sequence No信息示例
         *
         * @param header
         * @return
         */
        public static String getCurrentGtidSn(CanalEntry.Header header) {
            List<CanalEntry.Pair> props = header.getPropsList();
            if (props != null && props.size() > 0) {
                for (CanalEntry.Pair pair : props) {
                    if ("curtGtidSn".equals(pair.getKey())) {
                        return pair.getValue();
                    }
                }
            }
            return "";
        }
    
        /**
         * 获取当前Entry的 GTID Last Committed信息示例
         *
         * @param header
         * @return
         */
        public static String getCurrentGtidLct(CanalEntry.Header header) {
            List<CanalEntry.Pair> props = header.getPropsList();
            if (props != null && props.size() > 0) {
                for (CanalEntry.Pair pair : props) {
                    if ("curtGtidLct".equals(pair.getKey())) {
                        return pair.getValue();
                    }
                }
            }
            return "";
        }
    
    }