zl程序教程

您现在的位置是:首页 >  工具

当前栏目

clickhouse 基于集群实现分布式DDL的使用示例及坑

集群分布式分布式 实现 基于 示例 ClickHouse DDL
2023-09-11 14:16:24 时间

在ClickHouse中创建表、删表等DDL操作是一件麻烦的事,需要登录集群中的每一个节点去执行DDL语句,怎么简化这个操作呢?

ClickHouse(即CH)支持集群模式。 可以在DDL语句上附加ON CLUSTER <cluster_name>的语法,使得该DDL语句执行一次即可在集群中所有实例上都执行,简单方便。

一个集群拥有1到多个节点。
CREATE、ALTER、DROP、RENAME、TRUNCATE这些DDL语句,都支持分布式执行
【即如果在集群中的任意一个节点上执行DDL语句,那么集群中的每个节点都会以相同的顺序执行相同的语句
这样就省去了需要依次去单个节点执行DDL的烦恼】

Distributed表创建示例

Distributed表引擎中分布式表的代名词,它本身不存储任何数据,而是作为数据分片(本地表)的透明代理,能够自动路由数据至集群中的各个节点,所以Distributed表引擎需要和其他数据表引擎一起协同工作

注意:用{shard}{replica}两个动态宏变量代替了硬编码,会根据集群的shard、replica的配置填充正确的值

Distributed表从实体表层面看,一张分片表由两部分组成:

  • 本地表:通常以_local为后缀进行命名。本地表是承接数据的载体,可以使用非Distributed的任意表引擎,一张本地表对应了一个数据分片。
  • 分布式表:通常以_all为后缀进行命名。分布式表只能使用Distributed表引擎,
    • 它与本地表形成一对多的映射关系,
    • 将通过分布式表代理操作多张本地表

Distributed表的分片规则:
分片键要求返回一个整型类型的取值,包括Int系统和UInt系列。

  • 按某个字段的余数划分:Distributed(cluster,database,table,userId)
  • 按照随机数划分:Distributed(cluster,database,table,rand())
  • 按照某一个字段的散列值划分:Distributed(cluster,database,table,intHash64(userId))

注意: 对于分布式表与本地表之间的表结构的一致性检查,Distributed表引擎采用了读时检查的机制,这意味着如下:

  • 如果它们的表结构不兼容,只有在查询时才会抛出错误
  • 在创建表时不会进行表结构的一致性检查

插入数据后,可以看到数据均匀分布到各个shard上,在一个shard中,2个replica的数据是一致的。

创建表

create database test on cluster cluster_01;

create table events_local on cluster cluster_01 (
	ID String,
	EventType UInt8,
	URL String,
	EventTime DateTime
) ENGINE = ReplicatedMergeTree('/ch/tables/test/events_local/{shard}', '{replica}')
PARTITION BY toStartOfDay(EventTime)
ORDER BY (EventTime,EventType)
SETTINGS index_granularity = 8192


create table events on cluster cluster_01 (
	ID String,
	EventType UInt8,
	URL String,
	EventTime DateTime
) ENGINE = Distributed('cluster_01', 'test', 'events_local', rand())

插入数据

数据写入的原子性

CH内部所有的数据操作都是面向Block数据块的,所以Insert最终会将数据转换为Block数据块。
Insert语句在单个数据块的写入过程中具有原子性的
在默认情况下,每个数据块最多可以写入1048576行数据(由max_insert_block_size参数控制),如果一条INSERT语句写入的数据少于max_insert_block_size行,那么这批数据的写入是具有原子性的,即要么全部成功,要么全部失败。

注意:只有在CH服务端处理数据的时候才具有这种原子写入的特性,如使用HTTP或JDBC接口时。因为max_insert_block_size参数在使用CLI命令行或者INSERT SELECT子句写入时不生效的。

数据块会去重

对于被多次写入的相同数据块(大小相同且具有相同顺序相同行的数据块),该数据块仅会写入一次
原因是:

  • 万一在网络故障时,客户端应用程序不知道数据是否成功写入DB,此时可以简单地重复 INSERT ,把相同的数据发送给多个副本 INSERT 并不会有问题,因为这些 INSERT 是完全相同的(会被去重)。
  • 注意:ReplicatedMergeTree表才会去重
insert into events(ID,EventType,URL,EventTime) values ('11',1,'http://www.baidu.com','2021-12-20 12:00:00');
insert into events(ID,EventType,URL,EventTime) values ('12',1,'http://www.baidu.com','2021-12-20 12:00:00');
insert into events(ID,EventType,URL,EventTime) values ('13',1,'http://www.baidu.com','2021-12-20 12:00:00');
insert into events(ID,EventType,URL,EventTime) values ('14',1,'http://www.baidu.com','2021-12-20 12:00:00');
insert into events(ID,EventType,URL,EventTime) values ('15',1,'http://www.baidu.com','2021-12-20 12:00:00');
insert into events(ID,EventType,URL,EventTime) values ('16',1,'http://www.baidu.com','2021-12-20 12:00:00');

查看各节点本地表的数据

ips=['172.25.xx.xx:9000','172.25.xx.xx:9000','172.25.xx.xx:9000','172.25.xx.xx:9001','172.25.xx.xx:9001','172.25.xx.xx:9001']
sql='select ip,ID,EventType,URL,EventTime from ( '
sub_sql = '''
select '$ip' as ip,toInt64(ID) as ID,EventType,URL,EventTime from remote('$ip','test','events_local') 
where EventTime>'{startTime}' and EventTime<'{endTime}'
'''
for i, ip in enumerate(ips):
    sql+=sub_sql.replace('$ip',ip)
    if(i<(len(ips)-1)):
        sql+= 'union all'
sql+=') a order by ID'
params = {"startTime": "2021-12-20 00:00:00", "endTime": "2021-12-21 00:00:00"}
executeSQL(sql.format(**params))

查看各节点数据分区情况

ips=['172.25.xx.xx:9000','172.25.xx.xx:9000','172.25.xx.xx:9000','172.25.xx.xx:9001','172.25.xx.xx:9001','172.25.xx.xx:9001']
sql=' select * from ('
sub_sql = '''
select '$ip' as ip,* from remote('$ip','system','parts','default','cde3VFR$') 
where database ='ods' and table='events_local'
'''
for i, ip in enumerate(ips):
    sql+=sub_sql.replace('$ip',ip)
    if(i<(len(ips)-1)):
        sql+= 'union all'
sql+=') a order by ip'
executeSQL(sql)

删除数据

on cluster cluster_01;

删除表、数据库

drop table if exists events_local on cluster cluster_01;

drop table if exists events on cluster cluster_01;

drop database if exists test on cluster cluster_01;

附录

ch 集群配置示例

3 shard 2 replicas配置示例如下:
假设有3台服务器node01、node02、node03,每个服务器开启2个ch实例,对应的端口分别是9000、9001

注意:replica定义中,可以定义如下信息

  • 默认数据库 <default_database>test_01</default_database>
  • 用户名及密码 <user>default</user><password>xxxxx</password>
<?xml version="1.0" encoding="utf-8"?>
<yandex>
    <clickhouse_remote_servers>
        <cluster_01>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>node01</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>node02</host>
                    <port>9001</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>node02</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>node03</host>
                    <port>9001</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>node03</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>node01</host>
                    <port>9001</port>
                </replica>
            </shard>
        </cluster_01>
    </clickhouse_remote_servers>
    <clickhouse_compression>
        <case>
            <min_part_size>10000000000</min_part_size>
            <min_part_size_ratio>0.01</min_part_size_ratio>
            <method>lz4</method>
        </case>
    </clickhouse_compression>
</yandex>

ReplicatedMergeTree的表删除的坑

如果是ReplicatedMergeTree表,则删除时,仅仅删除的表结构,zookeeper上的数据是异步删除的,需要等较长时间才被删除,约20分钟

通过系统表查看zookeeper上的数据

SELECT * FROM system.zookeeper WHERE path = '/ch/tables/test/events_local/03/replicas'
SELECT * FROM system.zookeeper WHERE path = '/ch/tables/test/events_local'

查看系统配置

select getSetting('max_insert_block_size')

如何开启distributed_ddl?

允许在集群上执行分布式DDL语句(CREATE, DROP, ALTER, RENAME)。
仅当ZooKeeper处于开启状态时有效
如果不需要这样的功能,注释掉以下内容即可。
注释后,再执行distributed_ddl语句,会报 e.displayText() = DB::Exception: There is no DistributedDDL configuration in server config 的错误。

参数解释:

  • path: 分布式DDL写入zk的路径,默认就是/clickhouse/task_queue/ddl
  • cleanup_delay_period:检查DDL记录清理的间隔,单位为秒,默认60秒
  • task_max_lifetime:分布式DDL记录可以保留的最大时长,单位为秒,默认保留7天
  • max_tasks_in_queue:分布式DDL队列中可以保留的最大记录数,默认为1000条
  • pool_size : 控制可以同时运行多少个distributed_ddl语句
  • profile: 设置用于执行distributed_ddl语句的profile
    <distributed_ddl>
        <!-- Path in ZooKeeper to queue with DDL queries -->
        <path>/clickhouse/task_queue/ddl</path>

        <!-- Settings from this profile will be used to execute DDL queries -->
        <profile>default</profile>

        <!-- Controls how much ON CLUSTER queries can be run simultaneously. -->
        <pool_size>1</pool_size>

        <!-- Cleanup settings (active tasks will not be removed)-->
        <!-- Controls task TTL (default 1 week) 单位:秒 -->
        <task_max_lifetime>604800</task_max_lifetime>

        <!-- Controls how often cleanup should be performed (in seconds) -->
        <cleanup_delay_period>60</cleanup_delay_period>

        <!-- Controls how many tasks could be in the queue -->
        <max_tasks_in_queue>1000</max_tasks_in_queue>
    </distributed_ddl>