Canal + MySQL + MQ + ElasticSearch 实时数据同步
1. 执行摘要
在当前企业级分布式系统架构中,事务性数据库(OLTP)与分析型搜索引擎(OLAP)之间的数据实时同步,已成为核心基础设施需求。
- MySQL:承载核心业务数据的持久化与事务处理
- ElasticSearch(ES):提供全文检索、聚合分析、即席查询能力
由于 关系模型 与 文档模型 之间存在天然的 阻抗不匹配,传统的应用层双写或定时 ETL 已难以满足:
- 毫秒级延迟
- 高一致性
- 系统解耦
本报告系统性地阐述了基于 Canal + MySQL + MQ(Kafka / RocketMQ)+ ElasticSearch 的实时 CDC(Change Data Capture)架构,深入分析组件原理、关键配置、落地实践及生产环境常见问题。
2. 数据同步技术的演进与理论基础
2.1 传统同步模式的局限性
2.1.1 应用层双写
问题:
- 业务代码强耦合
- ES 写失败导致事务处理复杂
- 容易出现数据不一致
2.1.2 定时轮询(Query-based CDC)
典型方案:Logstash jdbc-input
致命缺陷:
- ❌ 无法感知物理删除(DELETE)
- ⚠ 实时性与数据库负载冲突
- ❌ 高并发场景存在漏读风险
2.2 基于日志的 CDC(Log-based CDC)
核心思想:直接解析数据库事务日志
- MySQL → Binlog
- Canal 伪装成 MySQL Slave
- 实时解析 INSERT / UPDATE / DELETE
优势:
- ✅ 低侵入
- ✅ 全量事件捕获
- ✅ 毫秒级延迟
3. 架构设计与组件选型
3.1 总体架构蓝图
MySQL
↓ (Binlog)
Canal Server
↓
Message Queue (Kafka / RocketMQ)
↓
Canal Adapter / Logstash
↓
ElasticSearch
设计思想:
- 存算分离
- 削峰填谷
- 高可用 & 可扩展
3.2 核心组件解析
3.2.1 MySQL
必须配置:
binlog_format = ROWbinlog_row_image = FULL
FULL 模式是保证 ES 文档完整性的关键
3.2.2 Canal Server
- 解析 Binlog
- 过滤库表
- 序列化并投递 MQ
- 支持 Kafka / RocketMQ 原生模式
3.2.3 Message Queue
引入 MQ 的价值:
- 解耦上下游
- 消峰填谷
- 多消费者广播
- 高可用持久化
Kafka vs RocketMQ:
| 对比项 | Kafka | RocketMQ |
|---|---|---|
| 吞吐 | 极高 | 高 |
| 延迟 | 略高 | 更低 |
| 顺序消息 | 分区级 | 原生支持 |
| 生态 | 大数据生态成熟 | Java / 阿里生态 |
3.2.4 Canal Adapter vs Logstash
| 特性 | Canal Adapter | Logstash |
|---|---|---|
| CDC 适配 | 原生 | 间接 |
| Join 支持 | ✅ SQL Join | ❌ |
| ETL 能力 | 中 | 强 |
| 运维复杂度 | 低 | 高 |
建议:
- 表 → ES 镜像同步:Canal Adapter
- 复杂清洗 / 脱敏:Logstash
4. MySQL 与 Canal Server 关键配置
4.1 MySQL 配置
[mysqld]
server_id = 1
log-bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 7
权限配置:
CREATE USER 'canal'@'%' IDENTIFIED BY 'Canal@123456';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
4.2 Canal Server 全局配置
canal.serverMode = kafka
kafka.bootstrap.servers = 192.168.1.100:9092
kafka.acks = all
kafka.linger.ms = 1
4.3 Canal Instance 配置(顺序性关键)
canal.mq.topic=example_topic
canal.mq.partitionsNum=3
canal.mq.partitionHash=mytest.user:id
canal.mq.flatMessage=true
partitionHash 是保证数据因果一致性的核心配置
5. 传输层:Kafka / RocketMQ
5.1 Kafka 建议配置
log.retention.hours = 24 ~ 48- 分区数与 Canal 保持一致
FlatMessage 示例:
{
"database": "mytest",
"table": "user",
"type": "INSERT",
"data": [{"id": "1", "name": "Alice"}]
}
5.2 RocketMQ 配置示例
canal.serverMode = rocketMQ
rocketmq.namesrv.addr = 127.0.0.1:9876
6. Canal Adapter 实战
6.1 Adapter 全局配置
canal.conf:
mode: kafka
flatMessage: true
syncBatchSize: 1000
必须关闭自动提交 Offset:
kafka.enable.auto.commit: false
6.2 ES Mapping 配置(核心)
esMapping:
_index: user_index
_id: _id
upsert: true
sql: "select a.id as _id, a.name, b.role_name from user a left join role b on b.id=a.role_id"
SQL Join 是 Adapter 的最大优势
6.3 日期与类型映射
ES 显式映射:
"create_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
}
7. Logstash 作为替代方案
7.1 Kafka → ES 管道示例
input { kafka { codec => json } }
filter { }
output { elasticsearch { } }
对比总结:
- Logstash:清洗强、Join 弱
- Adapter:Join 强、逻辑简单
8. 全量数据初始化(ETL)
8.1 触发方式
curl -X POST http://127.0.0.1:8081/etl/es7/user_mapping.yml -d "params=2023-01-01 00:00:00"
支持增量 + 全量并行,最终一致
9. 生产环境常见问题
9.1 SSL 证书错误
- ES 8.x 默认 HTTPS
- 需将 CA 导入 JDK TrustStore
9.2 Mapping 爆炸
- 禁用动态映射
- 明确字段 SELECT
9.3 长事务延迟
- 避免大事务
- 增加分区 & 消费并行度
10. 结论
成功落地的关键:
- Binlog = ROW + FULL
- MQ 分区顺序保证
- 合理的 Mapping 与 ETL 设计
- 完整的监控与运维能力
通过该架构,可构建一套:
毫秒级延迟 · 高可靠 · 高扩展 的实时搜索与分析数据底座




