23

腾讯大牛教你ClickHouse实时同步MySQL数据

 3 years ago
source link: https://mp.weixin.qq.com/s?__biz=MjM5ODEzNDA4OA%3D%3D&%3Bmid=2650317655&%3Bidx=1&%3Bsn=7348f7f930d26242628538a873a0fe76
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

| 作者    史鹏宙,CSIG云与智慧产业事业群研发工程师

ClickHouse作为OLAP分析引擎已经被广泛使用,数据的导入导出是用户面临的第一个问题。由于ClickHouse本身无法很好地支持单条大批量的写入,因此在实时同步数据方面需要借助其他服务协助。本文给出一种结合Canal+Kafka的方案,并且给出在多个MySQL实例分库分表的场景下,如何将多张MySQL数据表写入同一 张ClickHouse表的方法,欢迎大家批评指正。

首先来看看我们的需求背景:

1. 实时同步多个MySQL实例数据到ClickHouse,每天规模500G,记录数目亿级别,可以接受分钟级别的同步延迟;

2. 某些数据库表存在分库分表的操作,用户需要跨MySQL实例跨数据库的表同步到ClickHouse的一张表中;

3. 现有的MySQL binlog开源组件( Canal ),无法做到多张源数据表到一张目的表的映射关系。

基本原理

一、 使用JDBC方式同步

1. 使用Canal组件完成binlog的解析和数据同步;

2. Canal-Server进程会伪装成MySQL的slave,使用MySQL的binlog同步协议完成数据同步;

3. Canal-Adapter进程负责从canal-server获取解析后的binlog,并且通过jdbc接口写入到ClickHouse;

ZnIjAjR.png!mobile

优点:

1. Canal组件原生支持; 

缺点:

1. Canal-Adpater写入时源表和目的表一一对应,灵活性不足;

2. 需要维护两个Canal组件进程;

二、Kafka+ClickHouse物化视图方式同步

1. Canal-Server完成binlog的解析,并且将解析后的json写入Kafka;

2. Canal-Server可以根据正则表达式过滤数据库和表名,并且根据规则写入Kafka的topic;

3. ClickHouse使用KafkaEngine和Materialized View完成消息消费,并写入本地表;

qimIBj.png!mobile

优点:

1. Kafka支持水平扩展,可以根据数据规模调整partition数目;

2. Kafka引入后将写入请求合并,防止ClickHouse生成大量的小文件,从而影响查询性能;

3. Canal-Server支持规则过滤,可以灵活配置上游的MySQL实例的数据库名和表名,并且指明写入的Kafka topic名称;

缺点:

1. 需要维护Kafka和配置规则;

2. ClickHouse需要新建相关的视图、Kafka Engine的外表等;

具体步骤

一、准备工作

1. 如果使用TencentDB,则在控制台确认binlog_format为ROW,无需多余操作。

m6bAZjN.png!mobile

如果是自建MySQL,则在客户端中查询变量:

>   show variables like '%binlog%';

+-----------------------------------------+----------------------+

| Variable_name                           | Value                |

+-----------------------------------------+----------------------+

| binlog_format                           | ROW                  |

+-----------------------------------------+----------------------+

> show variables like '%log_bin%';

+---------------------------------+--------------------------------------------+

| Variable_name                   | Value                                      |

+---------------------------------+--------------------------------------------+

| log_bin                         | ON                                         |

| log_bin_basename                |  /data/mysql_root/log/20146/mysql-bin        |

| log_bin_index                   |  /data/mysql_root/log/20146/mysql-bin.index |

+---------------------------------+--------------------------------------------+

2. 创建账号canal,用于同步binlog

CREATE USER canal IDENTIFIED BY  'canal'; 

GRANT SELECT, REPLICATION SLAVE,  REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

二、Canal组件部署

前置条件:

Canal组件部署的机器需要跟ClickHouse服务和MySQL网络互通;

需要在机器上部署java8,配置JAVA_HOME、PATH等环境变量;

基本概念:

2AnuUfE.png!mobile

1. Canal-Server组件部署

Canal-Server的主要作用是订阅binlog信息并解析和定义instance相关信息,建议每个Canal-Server进程对应一个MySQL实例;

1)下载canal.deployer-1.1.4.tar.gz,解压

2)修改配置文件conf/canal.properties,需要关注的配置如下:

...

# 端口相关信息,如果同一台机器部署多个进程需要修改

canal.port = 11111

canal.metrics.pull.port = 11112

canal.admin.port = 11110

...

# 服务模式

canal.serverMode = tcp

...

# Kafka地址

canal.mq.servers = 172.21.48.11:9092

# 使用消息队列时 这两个值必须为true

canal.mq.flatMessage = true

canal.mq.flatMessage.onlyData = true

...

# instance列表,conf目录下必须有同名的目录

canal.destinations = example,example2

3)配置instance

可以参照example新增新的instance,主要修改配置文件conf/${instance_name}/instance.properties文件。    

样例1:  同步某个数据库的以XX前缀开头的表

订阅 172.21.48.35的MySQL的testdb数据库中的以tb_开头的表的数据变更(例如tb_20200801 、 tb_20200802等),主要的步骤如下:

步骤1: 创建example2实例: cddeployer/conf && cp -r example example2

步骤2: 修改deployer/conf/example2/instance.properties文件

...

# 上游MySQL实例地址

canal.instance.master.address=172.21.48.35:3306

...

# 同步账户信息

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

...

# 过滤数据库名称和表名

canal.instance.filter.regex=testdb\\.tb_.*,

步骤3: 在conf/canal.properties中修改 canal.destinations ,新增example 2

样例2:  同步多个数据库的以XX前缀开头的表,且输出到Kafka

订阅 172.21.48.35的MySQL的empdb_0数据库的employees_20200801表,empdb_1数据库的employees_20200802表,并且数据写入Kafka;

步骤1: 创建example2实例: cddeployer/conf && cp -r example example3

步骤2: 修改deployer/conf/example3/instance.properties文件

...

# 上游MySQL实例地址

canal.instance.master.address=172.21.48.35:3306

...

# 同步账户信息

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

...

# 过滤数据库名称和表名

canal.instance.filter.regex=empdb_.*\\.employees_.*

...

# Kafka的topic名称和匹配的规则

canal.mq.dynamicTopic=employees_topic:empdb_.*\\.employees_.*

canal.mq.partition=0

# Kafka topic的分区数目(即partition数目)

canal.mq.partitionsNum=3

# 根据employees_开头的表中的 emp_no字段来进行数据hash,分布到不同的partition

canal.mq.partitionHash=empdb_.*\\.employees_.*:emp_no

步骤3: 在Kafka中新建topic employees_topic,指定分区数目为3

步骤4: 在conf/canal.properties中修改 canal.destinations ,新增example3; 修改服务模式为kafka,配置kafka相关信息;

# 服务模式

canal.serverMode = kafka

...

# Kafka地址

canal.mq.servers = 172.21.48.11:9092

# 使用消息队列时 这两个值必须为true

canal.mq.flatMessage = true

canal.mq.flatMessage.onlyData = true

...

# instance列表,conf目录下必须有同名的目录

canal.destinations =  example,example2,example3

2. Canal-Adapter组件部署(只针对方案一)

Canal-Adapter的主要作用是通过JDBC接口写入ClickHouse数据,可以配置多个表的写入;

1)下载canal.adapter-1.1.4.tar.gz,解压;

2)在lib目录下新增clickhouse驱动jar包及httpclient的jar包 httpcore-4.4.13.jar、httpclient-4.3.3.jar、clickhouse-jdbc-0.2.4.jar;

3)修改配置文件conf/application.yml文件,修改canalServerHost、srcDataSources、canalAdapters的配置;

server:

port: 8081

spring:

jackson:

date-format: yyyy-MM-dd HH:mm:ss

time-zone: GMT+8

default-property-inclusion: non_null

canal.conf:

mode: tcp

canalServerHost: 127.0.0.1:11111   # canal-server的服务地址

batchSize: 500

syncBatchSize: 1000

retries: 0

timeout:

accessKey:

secretKey:

#  MySQL的配置,修改用户名密码及制定数据库

srcDataSources:

defaultDS:

url: jdbc:mysql://172.21.48.35:3306

username: root

password: yourpasswordhere

canalAdapters:

-  instance: example

groups:

- groupId: g1

outerAdapters:

- name: logger

- name: rdb

key: mysql1

# clickhouse的配置,修改用户名密码数据库

properties:

jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver

jdbc.url: jdbc:clickhouse://172.21.48.18:8123

jdbc.username: default

jdbc.password:

4)修改配置文件conf/rdb/mytest_user.yml文件

dataSourceKey: defaultDS

destination: example

groupId: g1

outerAdapterKey: mysql1

concurrent: true

dbMapping:

database:  testdb

mirrorDb: true

上述的配置文件中,由于开启了mirrorDb: true,目的端的ClickHouse必须有相同的数据库名和表名。

样例1:源数据库与目标数据库名字不同,源表名与目标表名不同

修改adapter的conf/rdb/mytest_user.yml配置文件,指定源数据库和目标数据库

dataSourceKey: defaultDS

destination: example

groupId: g1

outerAdapterKey: mysql1

concurrent: true

dbMapping:

database: source_database_name

table: source_table

targetTable: destination_database_name.destination_table

targetColumns:

id:

name:

commitBatch:  3000 # 批量提交的大小

样例2:多个源数据库表写入目的端的同一张表

在conf/rdb 目录配置多个yml文件,分别指明不同的table名称。

Kafka 服务配置

一、 调整合理的producer参数

确认Canal-Server里的canal.properties文件,重要参数见下表;

配置项

Kafka SDK配置项

配置说明

canal.mq .servers

bootstrap.servers

Kafka服务地址

canal.mq .retries

retries

producer在发送失败的时候会重试次数,默认为0

canal.mq .batchSize

batch.size

producer尝试发送同一个partition的请求数据量,默认为16K

canal.mq .maxRequestSize

max.request.size

producer请求的最大大小,默认为1M

canal.mq .lingerMs

linger.ms

producer等待发送的延迟,默认为100ms

canal.mq .bufferMemory

buffer.memory

producer使用的缓存消息的最大内存,默认为30M

canal.mq .flatMessage

-

Canal-Server 将binlog解析结果转为json;下游为ClickHouse Kafka Engine表时必须为true

canal.mq .flatMessage.onlyData

-

Canal-Server 只发送binlog解析结果中的data部分;下游为ClickHouse Kafka Engine表时必须为true

canal.mq .acks

acks

producer希望leader返回的用于确认请求完成的确认数量. 可选值 all, -1, 0 1. 默认值为all

二、新建相关的topic名称

根据Canal-Server里instance里配置文件instance.properties,注意分区数目与 canal.mq .partitionsNum 保持一致;

partition数目需要考虑以下因素:

1. 上游的MySQL的数据量。原则上数据写入量越大,应该分配更多的partition数目;

2. 考虑下游ClickHouse的实例数目。topic的partition分区总数 最好 不大于 下游ClickHouse的总实例数目,保证每个ClickHouse实例都能至少分配到一个partition;

ClickHouse服务配置

根据上游MySQL实例的表的schema新建数据表;

引入Kafka时需要额外新建Engine=Kafka的外表以及相关的物化视图表;

建议:

1. 为每个外表新增不同的 kafka_group_name,防止相互影响;

2. 设置kafka_skip_broken_messages 参数为合理值,遇到无法解析数据会跳过;

3. 设置合理的kafka_num_consumers值,最好保证所有ClickHouse实例该值的总和大于 topic的partition数目;

新建相关的分布式查询表;

服务启动

启动相关的Canal组件进程;

1. canal-server:  sh bin/startup.sh

2. canal-adapter: sh bin/startup.sh

在MySQL中插入数据,观察日志是否可以正常运行;

如果使用Kafka,可以通过kafka-console-consumer.sh脚本观察binlog数据解析;

观察ClickHouse数据表中是否正常写入数据;

实际案例

需求:实时同步MySQL实例的empdb_0.employees_20200801表和empdb_1.employees_20200802数据表

方案:使用方案二

环境及参数:

MySQL地址

172.21.48.35:3306

CKafka地址

172.21.48.11:9092

Canal instance名称

employees

Kafka目的topic

employees_topic 

1.在MySQL新建相关表

# MySQL表的建表语句

CREATE DATABASE `empdb_0`;

CREATE DATABASE `empdb_1`;

CREATE TABLE  `empdb_0`.`employees_20200801` (

`emp_no` int(11) NOT NULL,

`birth_date` date NOT NULL,

`first_name` varchar(14) NOT NULL,

`last_name` varchar(16) NOT NULL,

`gender` enum('M','F') NOT NULL,

`hire_date` date NOT NULL,

PRIMARY KEY (`emp_no`)

);

CREATE TABLE  `empdb_1`.`employees_20200802` (

`emp_no` int(11) NOT NULL,

`birth_date` date NOT NULL,

`first_name` varchar(14) NOT NULL,

`last_name` varchar(16) NOT NULL,

`gender` enum('M','F') NOT NULL,

`hire_date` date NOT NULL,

PRIMARY KEY (`emp_no`)

);

2. Canal-Server配置

步骤1. 修改conf/canal.properties文件

canal.serverMode = kafka

...

canal.destinations = example,employees

...

canal.mq.servers = 172.21.48.11:9092

canal.mq.retries = 0

canal.mq.batchSize = 16384

canal.mq.maxRequestSize = 1048576

canal.mq.lingerMs = 100

canal.mq.bufferMemory = 33554432

canal.mq.canalBatchSize = 50

canal.mq.canalGetTimeout = 100

canal.mq.flatMessage = true

canal.mq.flatMessage.onlyData = true

canal.mq.compressionType = none

canal.mq.acks = all

canal.mq.producerGroup = cdbproducer

canal.mq.accessChannel = local

...

步骤2. 新增employees实例,修改employees/instances.properties配置

...

canal.instance.master.address=172.21.48.35:3306

...

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

...

canal.instance.filter.regex=empdb_.*\\.employees_.*

...

canal.mq.dynamicTopic=employees_topic:empdb_.*\\.employees_.*

canal.mq.partition=0

canal.mq.partitionsNum=3

canal.mq.partitionHash=empdb_.*\\.employees_.*:emp_no

3. Kafka配置

4. 新增topic employees_topic,分区数为3

5. ClickHouse建表

CREATE DATABASE testckdb ON CLUSTER  default_cluster;

CREATE TABLE IF NOT EXISTS  testckdb.ck_employees ON CLUSTER default_cluster (

`emp_no` Int32,

`birth_date` String,

`first_name` String,

`last_name` String,

`gender` String,

`hire_date` String

) ENGINE=MergeTree() ORDER BY (emp_no)

SETTINGS index_granularity = 8192;

CREATE TABLE IF NOT EXISTS  testckdb.ck_employees_stream ON CLUSTER default_cluster (

`emp_no` Int32,

`birth_date` String,

`first_name` String,

`last_name` String,

`gender` String,

`hire_date` String

) ENGINE = Kafka()

SETTINGS

kafka_broker_list = '172.21.48.11:9092',

kafka_topic_list = 'employees_topic',

kafka_group_name = 'employees_group',

kafka_format = 'JSONEachRow',

kafka_skip_broken_messages = 1024,

kafka_num_consumers  = 1;

CREATE MATERIALIZED VIEW IF NOT EXISTS  testckdb.ck_employees_mv ON CLUSTER default_cluster TO testckdb.ck_employees(

`emp_no` Int32,

`birth_date` String,

`first_name` String,

`last_name` String,

`gender` String,

`hire_date`  String

) AS SELECT

`emp_no`,

`birth_date`,

`first_name`,

`last_name`,

`gender`,

`hire_date`

FROM

testckdb.ck_employees_stream;

CREATE TABLE IF NOT EXISTS  testckdb.ck_employees_dis ON CLUSTER default_cluster AS testckdb.ck_employees  

ENGINE=Distributed(default_cluster,  testckdb, ck_employees);

6. 启动Canal-Server服务

MySQL实例上游插入数据,观察数据是否在Canal-Server解析正常,是否在ClickHouse中完成同步。

手机运维小程序限时免费体验!

手机运维小程序——腾讯云数据库上线啦,从此在手机里可以实现实例信息查看,健康报告接收,慢SQL分析和异常查看等功能,以后回家终于可以不背电脑了!

3yMzIjV.jpg!mobile

QQ群号: 763628645

QQ群二维码如下, 添加请注明:姓名+地区+职位,否则不予通过

qIZJNzj.jpg!mobile


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK