开源库地址
https://github.com/shyiko/mysql-binlog-connector-java(不维护)
https://github.com/osheroff/mysql-binlog-connector-java(维护中)
读取实时数据
Tapping into MySQL replication stream
PREREQUISITES: Whichever user you plan to use for the BinaryLogClient, he MUST have REPLICATION SLAVE privilege. Unless you specify binlogFilename/binlogPosition yourself (in which case automatic resolution won’t kick in), you’ll need REPLICATION CLIENT granted as well.
1 | BinaryLogClient client = new BinaryLogClient("hostname", 3306, "username", "password"); |
You can register a listener for
onConnect
/onCommunicationFailure
/onEventDeserializationFailure
/onDisconnect
usingclient.registerLifecycleListener(...)
.
By default, BinaryLogClient starts from the current (at the time of connect) master binlog position. If you wish to kick off from a specific filename or position, use
client.setBinlogFilename(filename)
+client.setBinlogPosition(position)
.
client.connect()
is blocking (meaning that client will listen for events in the current thread).client.connect(timeout)
, on the other hand, spawns a separate thread.
使用:
1 | client.setBinlogFilename(); |
如果指定了不存在的 binlog 文件名,那么 BinlogClient 的线程不会结束,但是什么反应也不会有。
如果传的参数是空字符串,会从第一个 binlog 文件开始解析。
如果指定了错误的 position,会自动调整:
1 | 警告: Binary log position adjusted from -100 to 4 |
关于执行线程
client.connect()
is blocking (meaning that client will listen for events in the current thread).client.connect(timeout)
, on the other hand, spawns a separate thread.
前者会阻塞,使用当前线程作为解析线程,后者另开一个线程。
注意事项
TABLE_MAP 事件顺序问题
每次增删改事件之前都会有 TABLE_MAP 事件告诉你接下来插入的是哪张表。但是不代表对应的增删改事件就一定紧紧跟在 TABLE_MAP 之后。(可参考:How to get TableName in WriteRowsEvent/UpdateRowsEvent/DeleteRowsEvent · Issue #67 · shyiko/mysql-binlog-connector-java)
How do I get TableName in WriteRows/UpdateRows/DeleteRows events? I tried using TABLE_MAP event, but the problem is - if the table(A),being updated, has some trigger on it for a different table(B), it gives the wrong table name (B). I would like to add a custom deserializer and use tableMapEventByTableId hashmap to get table name by providing tableId there, but there is no way to set original EventDeserializer.tableMapEventByTableId as tablemap for new deserializer. Can you please suggest how to get tablename in my situation. Thanks.
在有触发器的场景下,TABLE_MAP 和后边的增删改事件可能不会准确对应。
断开 binlog 重新连接,如果连接在事务中心的问题
如果连接上去的 binlog position 正好是在一个事务中间,那么因为没有 TABLE_MAP 事件,所以中间会有一部分数据不知道是哪张表的,所以或许存在可能会丢数据。
但是实际上在 Event 处理时,如果调用 BinaryLogClient.getBinlogPosition()
获取 binlog 位置,获取到的总是 TABLE_MAP 事件的 binlog 位置,而不是 WRITE_ROWS 等事件的位置。所以说我们重新连接的时候总是会从 TABLE_MAP 事件开始连接的。
tableMapEventByTableId 不断增加问题
只要保持连接,BinlogClient 中缓存的 tableMapEventByTableId 就会一直增加,持续占用大量内存。
TableMapEventData instance increase always · Issue #123 · shyiko/mysql-binlog-connector-java
最新版已解决:https://github.com/osheroff/mysql-binlog-connector-java/pull/2
手动触发 table_id 变化的方法:
1 | BEGIN; |
tableId 复用问题:
如果数据库的表很多,那么就有可能发生 table_id 的复用。
另外,重启数据库可以使 table_id 归零,重启数据库之后,再写入数据的时候,会重新给每张表分配 table_id。
可以多看看 CHANGELOG
CHANGELOG.md
中包含了很多注意事项,包括和非正式 MySQL 发行版的兼容性问题,比如 AWS Aurora
。
断点/binlog 位置相关
以如下 binlog 为例
Log_name | Pos | Event_type | Server_id | End_log_pos | Info |
---|---|---|---|---|---|
mysql-bin.000152 | 39802822 | Anonymous_Gtid | 123 | 39802887 | SET @@SESSION.GTID_NEXT= ‘ANONYMOUS’ |
mysql-bin.000152 | 39802887 | Query | 123 | 39802960 | BEGIN |
mysql-bin.000152 | 39802960 | Table_map | 123 | 39803055 | table_id: 128 (fdldb.fine_dp_pipeline_execute) |
mysql-bin.000152 | 39803055 | Update_rows | 123 | 39803507 | table_id: 128 flags: STMT_END_F |
mysql-bin.000152 | 39803507 | Xid | 123 | 39803538 | COMMIT /* xid=657373 */ |
mysql-bin.000152 | 39803538 | Anonymous_Gtid | 123 | 39803603 | SET @@SESSION.GTID_NEXT= ‘ANONYMOUS’ |
mysql-bin.000152 | 39803603 | Query | 123 | 39803679 | BEGIN |
mysql-bin.000152 | 39803679 | Table_map | 123 | 39803751 | table_id: 399 (ddl_test.ddl_test1) |
mysql-bin.000152 | 39803751 | Update_rows | 123 | 39803919 | table_id: 399 flags: STMT_END_F |
mysql-bin.000152 | 39803919 | Xid | 123 | 39803950 | COMMIT /* xid=657399 */ |
在 EventListener 中处理事件的时候,从 this.client.getBinlogPosition() 获取到的是39803679,是tableMap的StartPos。
2023-03-23 09:55:41 确认在debezium中,使用的是 Update_rows 的 End_log_pos 作为断点,重启时候有可能丢数据。(根据FDL中参考的debezium的代码分析得来,没有直接看debezium源代码,需要进一步确认)
重启可能丢数据是因为一个Event中可能有多条数据,而我们是一条一条发送的,所以如果在发送中途,平台宕机或断点,重启后会直接跳过这个Update_rows的Event。
常见问题
报错堆栈记录1
1 | 2020-10-26 23:56:59.898 WARN 23080 --- [-localhost:3306] c.g.shyiko.mysql.binlog.BinaryLogClient : com.fr.finetube.core.work.step.component.pipeline.binlog.PipelineTaskBinlogListener@4b655caa choked on Event{header=EventHeaderV4{timestamp=1603727819000, eventType=EXT_WRITE_ROWS, serverId=123, headerLength=19, dataLength=44, nextPosition=17168452, flags=0}, data=WriteRowsEventData{tableId=161, includedColumns={0, 1}, rows=[ |
A slave with the same server_uuid/server_id as this slave
1 | A slave with the same server_uuid/server_id as this slave has connected to the master... |
Same Error Code · Issue #208 · shyiko/mysql-binlog-connector-java
Blocking Mode Problem · Issue #121 · shyiko/mysql-binlog-connector-java
每个 BinlogClient 都相当于是一个 MySQL slave,每个 slave server 都必须有一个唯一的 id,所以需要调用 client.setServerId()
为每一个 BinaryLogClient 设定 serverId,不然多个 client 之间会相互干扰。
数据类型
本章节介绍该库解析出来的数据的数据类型映射。即 MySQL 中的对应类型发生变更后,通过本库解析出来的对应的 Java 类型是什么。
研究 com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer
即可。
1 | ColumnType.TINY: Integer |
这个是基于 open-replicator - Google Code Archive (whitesock/open-replicator)进行解析的。
其他
MySQL Binlog 解析组件 open-replicator 原理介绍
整形变字节数组
Unsigned integer opt · Issue #16 · osheroff/mysql-binlog-connector-java
源码分析
解析数据的代码 AbstractRowsEventDataDeserializer
AbstractRowsEventDataDeserializer 中有从 binlog 解析为 JAVA 中数据类型的代码:
1 | protected Serializable deserializeCell(ColumnType type, int meta, int length, ByteArrayInputStream inputStream) throws IOException { |
KeepAlive 问题
How is keepalive supposed to work? · Issue #118 · shyiko/mysql-binlog-connector-java
Maxwell 就没有使用使用默认的 KeepAlive 机制:BinlogConnectorReplicator: add heartbeat detection [MELINF-2251] by timbertson-zd · Pull Request #1643 · zendesk/maxwell
而是自己实现了一套检测和重试机制。
知名开源项目使用
debezium、maxwell 都使用了这个库。
已知问题
可能解析中文字段名乱码问题
曾经遇到一个问题,读取 binlog 中的 DDL 的时候,解析出来的中文字段名都是乱码。排查后使用 Java 时没有设置为 UTF-8 编码,而 mysql-binlog-connector-java 读取 DDL 事件的时候使用的是 Java 的编码,导致解析出来的数据是乱码。启动 Java 时把编码设置为 UTF8(加上 -Dfile.encoding=UTF-8
启动参数)即可解决。
为什么把编码改成 UTF8 就没有问题呢?原因是 binlog 中保存的字段名等元数据信息都是以 UTF8 编码格式保存的,因为 MySQL 的系统编码(character_set_system
)格式就是 UTF8 且是在源代码中硬编码不能修改的。所以读取 binlog 中的字段名等信息的时候也要以 UTF8 编码格式来读取,而 mysql-binlog-connector-java 这个库,读取 binlog 中的字段名的时候,使用的是 Java 设置的编码,如果 Java 设置的编码不是 UTF8,那么读取中文字段等非英文字符的时候就可能出现乱码问题。所以 mysql-binlog-connector-java 这个库有这个缺陷还需要优化。
关于 character_set_system
,可以参考:
mysql 中有关字符集 character_set_xxx 系统参数的整理 - MySQL - NICECHI 博客
MySQL选择UTF-8作为元数据编码,用源码固定
问题位置:
QueryEventDataDeserializer 使用 ByteArrayInputStream 的 readString 方法,readString 方法中没有强制使用 UTF8 编码来处理 binlog 中的 SQL 数据。