zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

Doris总结-数据导入导出

2023-04-18 16:29:34 时间

1.数据导入导出

1.数据导入

  1. Broker load:将外部数据导入到Doris,入hdfs,为异步导入
  2. Stream Load:通过http协议导入,主要将文件导入到doris,为同步导入
    3.Routine Load:自动从数据源导入,只支持Kafka
    4.Binlog Load:通过监听binlog日志进行导入
  3. Insert Into :通过insert into select 或者insert into values(禁止使用) 的形式导入数据
  4. s3协议:类似于broker load,异步导入

1.Broker load

可以将HDFS中的文件导入到doris中,大数据量的场景下使用。
由FE生成对应的执行计划,分发给BE进行执行,BE会从文件系统拉取数据,对数据进行转换后导入到系统,反馈给FE结果,由FE确认是否完成。

语法在链接: 常用脚本文章中记录

2.Stream Load

通过发送http协议将文件导入到doris中,同步导入并返回结果,主要用来导入本地文件,支持csv和json格式。
用户提交导入请求后,会直接提交给FE,FE会将请求redirect一个BE进行处理,最终结果由BE返回。
语法:

curl --location-trusted -u user:passwd [-H ""...] -T data.file -
XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load

3.Routine Load

从数据源中监听导入数据,仅支持kafka,消息格式为csv或json。
用户提交请求到FE,FE会将作业拆分成多个Task,每个Task负责导入一部分,Task分配到BE执行,BE中Task作为一个Stream load任务导入,完成后想FE汇报,FE根据汇报结果继续生成新的Task,不断生成新的Task完成数据的持续导入。
语法:

CREATE ROUTINE LOAD test_db.kafka_test ON student_kafka
COLUMNS TERMINATED BY ",",
COLUMNS(id, name, age)
PROPERTIES
( "
desired_concurrent_number"="3",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list"= "hadoop1:9092,hadoop2:9092,hadoop3:9092",
"kafka_topic" = "test_doris1",
"property.group.id"="test_doris_group",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.enable.auto.commit"="false"
);

4.Binlog Load

提供一种CDC功能,依赖Canal,伪装成从节点从主节点同步Binlog进行解析,Doris在获取解析好的数据。
前提是需要安装Canal并且Mysql开启Binlog

5.Insert Into

insert into支持两种写法,insert into select :建议使用。insert into values:不建议使用,Doris官方文档中有说明。
语法:

INSERT INTO tbl2 WITH LABEL label1 SELECT * FROM tbl3;

6.S3

Doris支持通过S3协议的形式导入数据,如百度云的 BOS、阿里云的OSS和腾讯云的 COS 等。

详见 官方文档

2.数据导出

1.Export导出:将指定表或分区以文本的形式导出到存储系统中,如:HDFS
2.查询结果导出:将查询的结果导出到存储系统中

1.Export导出

用户提交导出请求后,FE会将涉及到的Tablet进行统计,然后进行分组,每组生成一个查询计划,然后通过Broker将数据写入到指定存储系统中。

语法:

export table example_site_visit2
to "hdfs://mycluster/doris-export"
PROPERTIES
(
"label" = "mylabel",
"column_separator"="|",
"timeout" = "3600"
)
WITH BROKER "broker_name"
(
#HDFS 开启 HA 需要指定,还指定其他参数
"dfs.nameservices"="mycluster",
"dfs.ha.namenodes.mycluster"="nn1,nn2,nn3",
"dfs.namenode.rpc-address.mycluster.nn1"= "hadoop1:8020",
"dfs.namenode.rpc-address.mycluster.nn2"= "hadoop2:8020",
"dfs.namenode.rpc-address.mycluster.nn3"="hadoop3:8020",
"dfs.client.failover.proxy.provider.mycluster"="org.apache.hadoop
.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);

2.查询结果导出

可以将查询结果导出到文件系统中,也可以导出到S3协议的文件系统中。

语法:

SELECT * FROM example_site_visit
INTO OUTFILE "hdfs://hadoop1:8020/doris-out/broker_a_"
FORMAT AS CSV
PROPERTIES
(
"broker.name" = "broker_name",
"column_separator" = ",",
"line_delimiter" = "
",
"max_file_size" = "100MB"
);