clickhouse 基于集群实现分布式DDL的使用示例及坑
在ClickHouse中创建表、删表等DDL操作是一件麻烦的事,需要登录集群中的每一个节点去执行DDL语句,怎么简化这个操作呢?
ClickHouse(即CH)支持集群模式。 可以在DDL语句上附加ON CLUSTER <cluster_name>的语法,使得该DDL语句执行一次即可在集群中所有实例上都执行,简单方便。
一个集群拥有1到多个节点。
CREATE、ALTER、DROP、RENAME、TRUNCATE这些DDL语句,都支持分布式执行
【即如果在集群中的任意一个节点上执行DDL语句,那么集群中的每个节点都会以相同的顺序执行相同的语句,
这样就省去了需要依次去单个节点执行DDL的烦恼】
Distributed表创建示例
Distributed表引擎中分布式表的代名词,它本身不存储任何数据,而是作为数据分片(本地表)的透明代理,能够自动路由数据至集群中的各个节点,所以Distributed表引擎需要和其他数据表引擎一起协同工作。
注意:用{shard}
、{replica}
两个动态宏变量代替了硬编码,会根据集群的shard、replica的配置填充正确的值
Distributed表从实体表层面看,一张分片表由两部分组成:
- 本地表:通常以_local为后缀进行命名。本地表是承接数据的载体,可以使用非Distributed的任意表引擎,一张本地表对应了一个数据分片。
- 分布式表:通常以_all为后缀进行命名。分布式表只能使用Distributed表引擎,
- 它与本地表形成一对多的映射关系,
- 将通过分布式表代理操作多张本地表
Distributed表的分片规则:
分片键要求返回一个整型类型的取值,包括Int系统和UInt系列。
- 按某个字段的余数划分:Distributed(cluster,database,table,userId)
- 按照随机数划分:Distributed(cluster,database,table,rand())
- 按照某一个字段的散列值划分:Distributed(cluster,database,table,intHash64(userId))
注意: 对于分布式表与本地表之间的表结构的一致性检查,Distributed表引擎采用了读时检查的机制,这意味着如下:
- 如果它们的表结构不兼容,只有在查询时才会抛出错误
- 在创建表时不会进行表结构的一致性检查
插入数据后,可以看到数据均匀分布到各个shard上,在一个shard中,2个replica的数据是一致的。
创建表
create database test on cluster cluster_01;
create table events_local on cluster cluster_01 (
ID String,
EventType UInt8,
URL String,
EventTime DateTime
) ENGINE = ReplicatedMergeTree('/ch/tables/test/events_local/{shard}', '{replica}')
PARTITION BY toStartOfDay(EventTime)
ORDER BY (EventTime,EventType)
SETTINGS index_granularity = 8192
create table events on cluster cluster_01 (
ID String,
EventType UInt8,
URL String,
EventTime DateTime
) ENGINE = Distributed('cluster_01', 'test', 'events_local', rand())
插入数据
数据写入的原子性
CH内部所有的数据操作都是面向Block数据块的,所以Insert最终会将数据转换为Block数据块。
Insert语句在单个数据块的写入过程中具有原子性的。
在默认情况下,每个数据块最多可以写入1048576行数据(由max_insert_block_size参数控制),如果一条INSERT语句写入的数据少于max_insert_block_size行,那么这批数据的写入是具有原子性的,即要么全部成功,要么全部失败。
注意:只有在CH服务端处理数据的时候才具有这种原子写入的特性,如使用HTTP或JDBC接口时。因为max_insert_block_size参数在使用CLI命令行或者INSERT SELECT子句写入时不生效的。
数据块会去重
对于被多次写入的相同数据块(大小相同且具有相同顺序、相同行的数据块),该数据块仅会写入一次。
原因是:
- 万一在网络故障时,客户端应用程序不知道数据是否成功写入DB,此时可以简单地重复 INSERT ,把相同的数据发送给多个副本 INSERT 并不会有问题,因为这些 INSERT 是完全相同的(会被去重)。
- 注意:ReplicatedMergeTree表才会去重
insert into events(ID,EventType,URL,EventTime) values ('11',1,'http://www.baidu.com','2021-12-20 12:00:00');
insert into events(ID,EventType,URL,EventTime) values ('12',1,'http://www.baidu.com','2021-12-20 12:00:00');
insert into events(ID,EventType,URL,EventTime) values ('13',1,'http://www.baidu.com','2021-12-20 12:00:00');
insert into events(ID,EventType,URL,EventTime) values ('14',1,'http://www.baidu.com','2021-12-20 12:00:00');
insert into events(ID,EventType,URL,EventTime) values ('15',1,'http://www.baidu.com','2021-12-20 12:00:00');
insert into events(ID,EventType,URL,EventTime) values ('16',1,'http://www.baidu.com','2021-12-20 12:00:00');
查看各节点本地表的数据
ips=['172.25.xx.xx:9000','172.25.xx.xx:9000','172.25.xx.xx:9000','172.25.xx.xx:9001','172.25.xx.xx:9001','172.25.xx.xx:9001']
sql='select ip,ID,EventType,URL,EventTime from ( '
sub_sql = '''
select '$ip' as ip,toInt64(ID) as ID,EventType,URL,EventTime from remote('$ip','test','events_local')
where EventTime>'{startTime}' and EventTime<'{endTime}'
'''
for i, ip in enumerate(ips):
sql+=sub_sql.replace('$ip',ip)
if(i<(len(ips)-1)):
sql+= 'union all'
sql+=') a order by ID'
params = {"startTime": "2021-12-20 00:00:00", "endTime": "2021-12-21 00:00:00"}
executeSQL(sql.format(**params))
查看各节点数据分区情况
ips=['172.25.xx.xx:9000','172.25.xx.xx:9000','172.25.xx.xx:9000','172.25.xx.xx:9001','172.25.xx.xx:9001','172.25.xx.xx:9001']
sql=' select * from ('
sub_sql = '''
select '$ip' as ip,* from remote('$ip','system','parts','default','cde3VFR$')
where database ='ods' and table='events_local'
'''
for i, ip in enumerate(ips):
sql+=sub_sql.replace('$ip',ip)
if(i<(len(ips)-1)):
sql+= 'union all'
sql+=') a order by ip'
executeSQL(sql)
删除数据
on cluster cluster_01;
删除表、数据库
drop table if exists events_local on cluster cluster_01;
drop table if exists events on cluster cluster_01;
drop database if exists test on cluster cluster_01;
附录
ch 集群配置示例
3 shard 2 replicas配置示例如下:
假设有3台服务器node01、node02、node03,每个服务器开启2个ch实例,对应的端口分别是9000、9001
注意:replica定义中,可以定义如下信息
- 默认数据库
<default_database>test_01</default_database>
- 用户名及密码
<user>default</user><password>xxxxx</password>
<?xml version="1.0" encoding="utf-8"?>
<yandex>
<clickhouse_remote_servers>
<cluster_01>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node01</host>
<port>9000</port>
</replica>
<replica>
<host>node02</host>
<port>9001</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node02</host>
<port>9000</port>
</replica>
<replica>
<host>node03</host>
<port>9001</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node03</host>
<port>9000</port>
</replica>
<replica>
<host>node01</host>
<port>9001</port>
</replica>
</shard>
</cluster_01>
</clickhouse_remote_servers>
<clickhouse_compression>
<case>
<min_part_size>10000000000</min_part_size>
<min_part_size_ratio>0.01</min_part_size_ratio>
<method>lz4</method>
</case>
</clickhouse_compression>
</yandex>
ReplicatedMergeTree的表删除的坑
如果是ReplicatedMergeTree表,则删除时,仅仅删除的表结构,zookeeper上的数据是异步删除的,需要等较长时间才被删除,约20分钟。
通过系统表查看zookeeper上的数据
SELECT * FROM system.zookeeper WHERE path = '/ch/tables/test/events_local/03/replicas'
SELECT * FROM system.zookeeper WHERE path = '/ch/tables/test/events_local'
查看系统配置
select getSetting('max_insert_block_size')
如何开启distributed_ddl?
允许在集群上执行分布式DDL语句(CREATE, DROP, ALTER, RENAME)。
仅当ZooKeeper处于开启状态时有效。
如果不需要这样的功能,注释掉以下内容即可。
注释后,再执行distributed_ddl语句,会报 e.displayText() = DB::Exception: There is no DistributedDDL configuration in server config
的错误。
参数解释:
- path: 分布式DDL写入zk的路径,默认就是/clickhouse/task_queue/ddl
- cleanup_delay_period:检查DDL记录清理的间隔,单位为秒,默认60秒
- task_max_lifetime:分布式DDL记录可以保留的最大时长,单位为秒,默认保留7天
- max_tasks_in_queue:分布式DDL队列中可以保留的最大记录数,默认为1000条
- pool_size : 控制可以同时运行多少个distributed_ddl语句
- profile: 设置用于执行distributed_ddl语句的profile
<distributed_ddl>
<!-- Path in ZooKeeper to queue with DDL queries -->
<path>/clickhouse/task_queue/ddl</path>
<!-- Settings from this profile will be used to execute DDL queries -->
<profile>default</profile>
<!-- Controls how much ON CLUSTER queries can be run simultaneously. -->
<pool_size>1</pool_size>
<!-- Cleanup settings (active tasks will not be removed)-->
<!-- Controls task TTL (default 1 week) 单位:秒 -->
<task_max_lifetime>604800</task_max_lifetime>
<!-- Controls how often cleanup should be performed (in seconds) -->
<cleanup_delay_period>60</cleanup_delay_period>
<!-- Controls how many tasks could be in the queue -->
<max_tasks_in_queue>1000</max_tasks_in_queue>
</distributed_ddl>
相关文章
- Memcached集群/分布式/高可用 及 Magent缓存代理搭建过程 详解
- kubernetes集群角色管理
- eclipse连接远程hadoop集群开发时报错
- Dubbo+zookeeper构建高可用分布式集群(二)-集群部署
- Dubbo+zookeeper构建高可用分布式集群(一)-单机部署
- 到底什么是集群&分布式
- spring boot:使用redis cluster集群作为分布式session(redis 6.0.5/spring boot 2.3.1)
- Minio分布式集群示例:8个节点,每节点1块盘
- Hadoop 分布式集群搭建步骤
- E-MapReduce集群支持预装Phoenix
- 分布式与集群的区别是什么?
- 【转】分布式与集群的区别
- Hadoop快速入门——第二章、分布式集群(第四节、搭建开发环境)
- 一脸懵逼搭建Zookeeper分布式集群
- 通过ECK部署elasticsearch集群(k8s+elasticsearch+kibana)
- kubernetes集群搭建efk日志收集平台
- 【云原生之存储实战】部署Ceph分布式存储集群
- Kubernetes集群Configmap配置存储资源(三十六)
- 关闭Hadoop集群报错