开源库地址

https://github.com/shyiko/mysql-binlog-connector-java(不维护)

https://github.com/osheroff/mysql-binlog-connector-java(维护中)

读取实时数据

Tapping into MySQL replication stream

PREREQUISITES: Whichever user you plan to use for the BinaryLogClient, he MUST have REPLICATION SLAVE privilege. Unless you specify binlogFilename/binlogPosition yourself (in which case automatic resolution won’t kick in), you’ll need REPLICATION CLIENT granted as well.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
BinaryLogClient client = new BinaryLogClient("hostname", 3306, "username", "password");
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
client.setEventDeserializer(eventDeserializer);
client.registerEventListener(new EventListener() {

@Override
public void onEvent(Event event) {
...
}
});
client.connect();

You can register a listener for onConnect / onCommunicationFailure / onEventDeserializationFailure / onDisconnect using client.registerLifecycleListener(...).

By default, BinaryLogClient starts from the current (at the time of connect) master binlog position. If you wish to kick off from a specific filename or position, use client.setBinlogFilename(filename) + client.setBinlogPosition(position).

client.connect() is blocking (meaning that client will listen for events in the current thread). client.connect(timeout), on the other hand, spawns a separate thread.

使用:

1
2
client.setBinlogFilename();
client.setBinlogPosition();

如果指定了不存在的 binlog 文件名,那么 BinlogClient 的线程不会结束,但是什么反应也不会有。

如果传的参数是空字符串,会从第一个 binlog 文件开始解析。

如果指定了错误的 position,会自动调整:

1
警告: Binary log position adjusted from -100 to 4

关于执行线程

client.connect() is blocking (meaning that client will listen for events in the current thread). client.connect(timeout), on the other hand, spawns a separate thread.

前者会阻塞,使用当前线程作为解析线程,后者另开一个线程。

注意事项

TABLE_MAP 事件顺序问题

每次增删改事件之前都会有 TABLE_MAP 事件告诉你接下来插入的是哪张表。但是不代表对应的增删改事件就一定紧紧跟在 TABLE_MAP 之后。(可参考:How to get TableName in WriteRowsEvent/UpdateRowsEvent/DeleteRowsEvent · Issue #67 · shyiko/mysql-binlog-connector-java

How do I get TableName in WriteRows/UpdateRows/DeleteRows events? I tried using TABLE_MAP event, but the problem is - if the table(A),being updated, has some trigger on it for a different table(B), it gives the wrong table name (B). I would like to add a custom deserializer and use tableMapEventByTableId hashmap to get table name by providing tableId there, but there is no way to set original EventDeserializer.tableMapEventByTableId as tablemap for new deserializer. Can you please suggest how to get tablename in my situation. Thanks.

在有触发器的场景下,TABLE_MAP 和后边的增删改事件可能不会准确对应。

断开 binlog 重新连接,如果连接在事务中心的问题

MissingTableMapEventException on multiple-rows transaction situation. · Issue #247 · shyiko/mysql-binlog-connector-java

如果连接上去的 binlog position 正好是在一个事务中间,那么因为没有 TABLE_MAP 事件,所以中间会有一部分数据不知道是哪张表的,所以或许存在可能会丢数据。

但是实际上在 Event 处理时,如果调用 BinaryLogClient.getBinlogPosition() 获取 binlog 位置,获取到的总是 TABLE_MAP 事件的 binlog 位置,而不是 WRITE_ROWS 等事件的位置。所以说我们重新连接的时候总是会从 TABLE_MAP 事件开始连接的。

tableMapEventByTableId 不断增加问题

只要保持连接,BinlogClient 中缓存的 tableMapEventByTableId 就会一直增加,持续占用大量内存。

TableMapEventData instance increase always · Issue #123 · shyiko/mysql-binlog-connector-java

最新版已解决:https://github.com/osheroff/mysql-binlog-connector-java/pull/2

手动触发 table_id 变化的方法:

1
2
3
4
5
BEGIN;
INSERT INTO t1 VALUES (1);
FLUSH TABLES;
INSERT INTO t1 VALUES (2);
COMMIT;

tableId 复用问题:

Maxwell wrongly interprets the database and table on some binlog events · Issue #546 · zendesk/maxwell

如果数据库的表很多,那么就有可能发生 table_id 的复用。

另外,重启数据库可以使 table_id 归零,重启数据库之后,再写入数据的时候,会重新给每张表分配 table_id。

可以多看看 CHANGELOG

CHANGELOG.md 中包含了很多注意事项,包括和非正式 MySQL 发行版的兼容性问题,比如 AWS Aurora

断点/binlog 位置相关

以如下 binlog 为例

Log_name Pos Event_type Server_id End_log_pos Info
mysql-bin.000152 39802822 Anonymous_Gtid 123 39802887 SET @@SESSION.GTID_NEXT= ‘ANONYMOUS’
mysql-bin.000152 39802887 Query 123 39802960 BEGIN
mysql-bin.000152 39802960 Table_map 123 39803055 table_id: 128 (fdldb.fine_dp_pipeline_execute)
mysql-bin.000152 39803055 Update_rows 123 39803507 table_id: 128 flags: STMT_END_F
mysql-bin.000152 39803507 Xid 123 39803538 COMMIT /* xid=657373 */
mysql-bin.000152 39803538 Anonymous_Gtid 123 39803603 SET @@SESSION.GTID_NEXT= ‘ANONYMOUS’
mysql-bin.000152 39803603 Query 123 39803679 BEGIN
mysql-bin.000152 39803679 Table_map 123 39803751 table_id: 399 (ddl_test.ddl_test1)
mysql-bin.000152 39803751 Update_rows 123 39803919 table_id: 399 flags: STMT_END_F
mysql-bin.000152 39803919 Xid 123 39803950 COMMIT /* xid=657399 */

在 EventListener 中处理事件的时候,从 this.client.getBinlogPosition() 获取到的是39803679,是tableMap的StartPos。

2023-03-23 09:55:41 确认在debezium中,使用的是 Update_rows 的 End_log_pos 作为断点,重启时候有可能丢数据。(根据FDL中参考的debezium的代码分析得来,没有直接看debezium源代码,需要进一步确认)

重启可能丢数据是因为一个Event中可能有多条数据,而我们是一条一条发送的,所以如果在发送中途,平台宕机或断点,重启后会直接跳过这个Update_rows的Event。

常见问题

报错堆栈记录1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
2020-10-26 23:56:59.898  WARN 23080 --- [-localhost:3306] c.g.shyiko.mysql.binlog.BinaryLogClient  : com.fr.finetube.core.work.step.component.pipeline.binlog.PipelineTaskBinlogListener@4b655caa choked on Event{header=EventHeaderV4{timestamp=1603727819000, eventType=EXT_WRITE_ROWS, serverId=123, headerLength=19, dataLength=44, nextPosition=17168452, flags=0}, data=WriteRowsEventData{tableId=161, includedColumns={0, 1}, rows=[
[2020-10-26 23:56:59, test]
]}}

java.lang.NullPointerException: null
at com.fr.finetube.core.work.step.component.pipeline.binlog.PipelineTaskBinlogListener.generateEvents(PipelineTaskBinlogListener.java:249)
at com.fr.finetube.core.work.step.component.pipeline.binlog.PipelineTaskBinlogListener.handleInsert(PipelineTaskBinlogListener.java:206)
at com.fr.finetube.core.work.step.component.pipeline.binlog.PipelineTaskBinlogListener.onEvent(PipelineTaskBinlogListener.java:178)
at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1158)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1005)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connectWithTimeout(BinaryLogClient.java:517)
at com.github.shyiko.mysql.binlog.BinaryLogClient.access$1100(BinaryLogClient.java:90)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:881)
at java.lang.Thread.run(Thread.java:748)

A slave with the same server_uuid/server_id as this slave

1
A slave with the same server_uuid/server_id as this slave has connected to the master...

Same Error Code · Issue #208 · shyiko/mysql-binlog-connector-java

SocketException when running multiple BinaryLogClient processes · Issue #185 · shyiko/mysql-binlog-connector-java

Blocking Mode Problem · Issue #121 · shyiko/mysql-binlog-connector-java

每个 BinlogClient 都相当于是一个 MySQL slave,每个 slave server 都必须有一个唯一的 id,所以需要调用 client.setServerId() 为每一个 BinaryLogClient 设定 serverId,不然多个 client 之间会相互干扰。

数据类型

本章节介绍该库解析出来的数据的数据类型映射。即 MySQL 中的对应类型发生变更后,通过本库解析出来的对应的 Java 类型是什么。

研究 com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer 即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ColumnType.TINY: Integer
ColumnType.SHORT: Integer
ColumnType.LONG: Integer
ColumnType.INT24: Integer
ColumnType.YEAR: Integer
ColumnType.ENUM: Integer
ColumnType.SET: Long
ColumnType.LONGLONG: Long
ColumnType.FLOAT: Float
ColumnType.DOUBLE: Double
ColumnType.BIT: java.util.BitSet
ColumnType.DATETIME: java.util.Date
ColumnType.DATETIME_V2: java.util.Date
ColumnType.NEWDECIMAL: java.math.BigDecimal
ColumnType.TIMESTAMP: java.sql.Timestamp
ColumnType.TIMESTAMP_V2: java.sql.Timestamp
ColumnType.DATE: java.sql.Date
ColumnType.TIME: java.sql.Time
ColumnType.TIME_V2: java.sql.Time
ColumnType.VARCHAR: String
ColumnType.VAR_STRING: String
ColumnType.STRING: String
ColumnType.BLOB: byte[]
ColumnType.GEOMETRY: byte[]

这个是基于 open-replicator - Google Code Archivewhitesock/open-replicator)进行解析的。

其他

MySQL Binlog 解析组件 open-replicator 原理介绍

整形变字节数组

Unsigned integer opt · Issue #16 · osheroff/mysql-binlog-connector-java

Add option to parse integer as byte array by zzt93 · Pull Request #21 · osheroff/mysql-binlog-connector-java

源码分析

解析数据的代码 AbstractRowsEventDataDeserializer

AbstractRowsEventDataDeserializer 中有从 binlog 解析为 JAVA 中数据类型的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected Serializable deserializeCell(ColumnType type, int meta, int length, ByteArrayInputStream inputStream) throws IOException {
switch(type) {
case BIT:
return this.deserializeBit(meta, inputStream);
case TINY:
return this.deserializeTiny(inputStream);
case SHORT:
return this.deserializeShort(inputStream);
case INT24:
return this.deserializeInt24(inputStream);
case LONG:
return this.deserializeLong(inputStream);
case LONGLONG:
return this.deserializeLongLong(inputStream);
// ...
}

KeepAlive 问题

How is keepalive supposed to work? · Issue #118 · shyiko/mysql-binlog-connector-java

Maxwell 就没有使用使用默认的 KeepAlive 机制:BinlogConnectorReplicator: add heartbeat detection [MELINF-2251] by timbertson-zd · Pull Request #1643 · zendesk/maxwell

而是自己实现了一套检测和重试机制。

知名开源项目使用

debezium、maxwell 都使用了这个库。

已知问题

可能解析中文字段名乱码问题

曾经遇到一个问题,读取 binlog 中的 DDL 的时候,解析出来的中文字段名都是乱码。排查后使用 Java 时没有设置为 UTF-8 编码,而 mysql-binlog-connector-java 读取 DDL 事件的时候使用的是 Java 的编码,导致解析出来的数据是乱码。启动 Java 时把编码设置为 UTF8(加上 -Dfile.encoding=UTF-8 启动参数)即可解决。

为什么把编码改成 UTF8 就没有问题呢?原因是 binlog 中保存的字段名等元数据信息都是以 UTF8 编码格式保存的,因为 MySQL 的系统编码(character_set_system)格式就是 UTF8 且是在源代码中硬编码不能修改的。所以读取 binlog 中的字段名等信息的时候也要以 UTF8 编码格式来读取,而 mysql-binlog-connector-java 这个库,读取 binlog 中的字段名的时候,使用的是 Java 设置的编码,如果 Java 设置的编码不是 UTF8,那么读取中文字段等非英文字符的时候就可能出现乱码问题。所以 mysql-binlog-connector-java 这个库有这个缺陷还需要优化。

关于 character_set_system,可以参考:

mysql 中有关字符集 character_set_xxx 系统参数的整理 - MySQL - NICECHI 博客

MySQL选择UTF-8作为元数据编码,用源码固定

——PolarDB 数据库内核月报

问题位置:

mysql-binlog-connector-java/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/QueryEventDataDeserializer.java at 825f21f574e17bfab864eb88818f1611bdf7f37a · osheroff/mysql-binlog-connector-java

QueryEventDataDeserializer 使用 ByteArrayInputStream 的 readString 方法,readString 方法中没有强制使用 UTF8 编码来处理 binlog 中的 SQL 数据。