Canal + MySQL + MQ + ElasticSearch 实时数据同步
542
类别: 
开发交流

1. 执行摘要

在当前企业级分布式系统架构中,事务性数据库(OLTP)与分析型搜索引擎(OLAP)之间的数据实时同步,已成为核心基础设施需求。

  • MySQL:承载核心业务数据的持久化与事务处理
  • ElasticSearch(ES):提供全文检索、聚合分析、即席查询能力

由于 关系模型文档模型 之间存在天然的 阻抗不匹配,传统的应用层双写或定时 ETL 已难以满足:

  • 毫秒级延迟
  • 高一致性
  • 系统解耦

本报告系统性地阐述了基于 Canal + MySQL + MQ(Kafka / RocketMQ)+ ElasticSearch 的实时 CDC(Change Data Capture)架构,深入分析组件原理、关键配置、落地实践及生产环境常见问题。


2. 数据同步技术的演进与理论基础

2.1 传统同步模式的局限性

2.1.1 应用层双写

问题:

  • 业务代码强耦合
  • ES 写失败导致事务处理复杂
  • 容易出现数据不一致

2.1.2 定时轮询(Query-based CDC)

典型方案:Logstash jdbc-input

致命缺陷:

  1. ❌ 无法感知物理删除(DELETE)
  2. ⚠ 实时性与数据库负载冲突
  3. ❌ 高并发场景存在漏读风险

2.2 基于日志的 CDC(Log-based CDC)

核心思想:直接解析数据库事务日志

  • MySQL → Binlog
  • Canal 伪装成 MySQL Slave
  • 实时解析 INSERT / UPDATE / DELETE

优势:

  • ✅ 低侵入
  • ✅ 全量事件捕获
  • ✅ 毫秒级延迟

3. 架构设计与组件选型

3.1 总体架构蓝图

MySQL
  ↓ (Binlog)
Canal Server
  ↓
Message Queue (Kafka / RocketMQ)
  ↓
Canal Adapter / Logstash
  ↓
ElasticSearch

设计思想:

  • 存算分离
  • 削峰填谷
  • 高可用 & 可扩展

3.2 核心组件解析

3.2.1 MySQL

必须配置:

  • binlog_format = ROW
  • binlog_row_image = FULL

FULL 模式是保证 ES 文档完整性的关键

3.2.2 Canal Server

  • 解析 Binlog
  • 过滤库表
  • 序列化并投递 MQ
  • 支持 Kafka / RocketMQ 原生模式

3.2.3 Message Queue

引入 MQ 的价值:

  • 解耦上下游
  • 消峰填谷
  • 多消费者广播
  • 高可用持久化

Kafka vs RocketMQ:

对比项KafkaRocketMQ
吞吐极高
延迟略高更低
顺序消息分区级原生支持
生态大数据生态成熟Java / 阿里生态

3.2.4 Canal Adapter vs Logstash

特性Canal AdapterLogstash
CDC 适配原生间接
Join 支持✅ SQL Join
ETL 能力
运维复杂度

建议:

  • 表 → ES 镜像同步:Canal Adapter
  • 复杂清洗 / 脱敏:Logstash

4. MySQL 与 Canal Server 关键配置

4.1 MySQL 配置

[mysqld]
server_id = 1
log-bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 7

权限配置:

CREATE USER 'canal'@'%' IDENTIFIED BY 'Canal@123456';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

4.2 Canal Server 全局配置

canal.serverMode = kafka
kafka.bootstrap.servers = 192.168.1.100:9092
kafka.acks = all
kafka.linger.ms = 1

4.3 Canal Instance 配置(顺序性关键)

canal.mq.topic=example_topic
canal.mq.partitionsNum=3
canal.mq.partitionHash=mytest.user:id
canal.mq.flatMessage=true

partitionHash 是保证数据因果一致性的核心配置


5. 传输层:Kafka / RocketMQ

5.1 Kafka 建议配置

  • log.retention.hours = 24 ~ 48
  • 分区数与 Canal 保持一致

FlatMessage 示例:

{
  "database": "mytest",
  "table": "user",
  "type": "INSERT",
  "data": [{"id": "1", "name": "Alice"}]
}

5.2 RocketMQ 配置示例

canal.serverMode = rocketMQ
rocketmq.namesrv.addr = 127.0.0.1:9876

6. Canal Adapter 实战

6.1 Adapter 全局配置

canal.conf:
  mode: kafka
  flatMessage: true
  syncBatchSize: 1000

必须关闭自动提交 Offset:

kafka.enable.auto.commit: false

6.2 ES Mapping 配置(核心)

esMapping:
  _index: user_index
  _id: _id
  upsert: true
  sql: "select a.id as _id, a.name, b.role_name from user a left join role b on b.id=a.role_id"

SQL Join 是 Adapter 的最大优势


6.3 日期与类型映射

ES 显式映射:

"create_time": {
  "type": "date",
  "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
}

7. Logstash 作为替代方案

7.1 Kafka → ES 管道示例

input { kafka { codec => json } }
filter { }
output { elasticsearch { } }

对比总结:

  • Logstash:清洗强、Join 弱
  • Adapter:Join 强、逻辑简单

8. 全量数据初始化(ETL)

8.1 触发方式

curl -X POST http://127.0.0.1:8081/etl/es7/user_mapping.yml -d "params=2023-01-01 00:00:00"

支持增量 + 全量并行,最终一致


9. 生产环境常见问题

9.1 SSL 证书错误

  • ES 8.x 默认 HTTPS
  • 需将 CA 导入 JDK TrustStore

9.2 Mapping 爆炸

  • 禁用动态映射
  • 明确字段 SELECT

9.3 长事务延迟

  • 避免大事务
  • 增加分区 & 消费并行度

10. 结论

成功落地的关键:

  • Binlog = ROW + FULL
  • MQ 分区顺序保证
  • 合理的 Mapping 与 ETL 设计
  • 完整的监控与运维能力

通过该架构,可构建一套:

毫秒级延迟 · 高可靠 · 高扩展 的实时搜索与分析数据底座

标签:
评论 0
/ 1000
0
0
收藏