Broker Load 是一种基于 MySQL 协议的异步导入方式。您提交导入作业以后,StarRocks 会异步地执行导入作业。您需要通过 SHOW LOAD 语句或者 curl 命令来查看导入作业的结果。有关 Broker Load 的背景信息、前提条件、基本原理、以及支持的数据文件格式和外部存储系统等,请参见从 HDFS 或外部云存储系统导入数据。
LOAD LABEL [<database_name>.]<label_name>
(
data_desc[, data_desc ...]
)
WITH BROKER
[broker_properties]
[opt_properties];
注意在 StarRocks 中,部分文字是 SQL 语言的保留关键字,不能直接用于 SQL 语句。如果想在 SQL 语句中使用这些保留关键字,必须用反引号 (`) 包含起来。参见关键字。
指定导入作业的标签。
每个导入作业都对应一个在该数据库内唯一的标签。通过标签,可以查看对应导入作业的执行情况,并防止导入相同的数据。导入作业的状态为 FINISHED 时,其标签不可再复用给其他导入作业。导入作业的状态为 CANCELLED 时,其标签可以复用给其他导入作业,但通常都是用来重试同一个导入作业(即使用同一个标签导入相同的数据)以实现数据“精确一次 (Exactly-Once)”语义。
有关标签的命名规范,请参见系统限制。
用于描述一批次待导入的数据。每个 data_desc
声明了本批次待导入数据所属的数据源地址、ETL 函数、StarRocks 表和分区等信息。
Broker Load 支持一次导入多个数据文件。在一个导入作业中,您可以使用多个 data_desc
来声明导入多个数据文件,也可以使用一个 data_desc
来声明导入一个路径下的所有数据文件。Broker Load 还支持保证单次导入事务的原子性,即单次导入的多个数据文件都成功或者都失败,而不会出现部分导入成功、部分导入失败的情况。
data_desc
语法如下:
DATA INFILE ("<file_path>"[, "<file_path>" ...])
[NEGATIVE]
INTO TABLE <table_name>
[PARTITION (<partition_name>[, <partition_name> ...])]
[FORMAT AS "CSV | Parquet | ORC"]
[COLUMNS TERMINATED BY "<column_separator>"]
[(column_list)]
[COLUMNS FROM PATH AS (<partition_field_name>[, <partition_field_name> ...])]
[SET <k1=f1(v1)>[, <k2=f2(v2)> ...]]
[WHERE predicate]
data_desc
中的必选参数如下:
-
file_path
用于指定待导入数据文件所在的路径。
您可以指定导入一个具体的数据文件。例如,通过指定
"hdfs://<hdfs_host>:<hdfs_port>/user/data/tablename/20210411"
可以匹配 HDFS 服务器上/user/data/tablename
目录下名为20210411
的数据文件。您也可以用通配符指定导入某个路径下所有的数据文件。Broker Load 支持如下通配符:
?
、*
、[]
、{}
和^
。具体请参见通配符使用规则参考。例如, 通过指定"hdfs://<hdfs_host>:<hdfs_port>/user/data/tablename/*/*"
路径可以匹配 HDFS 服务器上/user/data/tablename
目录下所有分区内的数据文件,通过"hdfs://<hdfs_host>:<hdfs_port>/user/data/tablename/dt=202104*/*"
路径可以匹配 HDFS 服务器上/user/data/tablename
目录下所有202104
分区内的数据文件。说明
中间的目录也可以使用通配符匹配。
以 HDFS 数据源为例,文件路径中的
hdfs_host
和hdfs_port
参数说明如下:hdfs_host
:HDFS 集群中 NameNode 所在主机的 IP 地址。hdfs_port
:HDFS 集群中 NameNode 所在主机的 FS 端口。默认端口号为9000
。
-
INTO TABLE
用于指定目标 StarRocks 表的名称。
data_desc
中的可选参数如下:
-
NEGATIVE
用于撤销某一批已经成功导入的数据。如果想要撤销某一批已经成功导入的数据,可以通过指定
NEGATIVE
关键字来导入同一批数据。说明
该参数仅适用于目标 StarRocks 表使用聚合模型、并且所有 Value 列的聚合函数均为
sum
的情况。 -
PARTITION
指定要把数据导入哪些分区。如果不指定该参数,则默认导入到 StarRocks 表所在的所有分区中。
-
FORMAT AS
用于指定待导入数据文件的格式。取值包括
CSV
、Parquet
和ORC
。如果不指定该参数,则默认通过file_path
参数中指定的文件扩展名(.csv、.parquet、和 .orc)来判断文件格式。 -
COLUMNS TERMINATED BY
用于指定待导入数据文件中的列分隔符。如果不指定该参数,则默认列分隔符为
\t
,即 Tab。必须确保这里指定的列分隔符与待导入数据文件中的列分隔符一致;否则,导入作业会因数据质量错误而失败,作业状态 (State
) 会显示为CANCELLED
。需要注意的是,Broker Load 通过 MySQL 协议提交导入请求,除了 StarRocks 会做转义处理以外,MySQL 协议也会做转义处理。因此,如果列分隔符是 Tab 等不可见字符,则需要在列分隔字符前面多加一个反斜线 (\)。例如,如果列分隔符是
\t
,这里必须输入\\t
;如果列分隔符是\n
,这里必须输入\\n
。Apache Hive™ 文件的列分隔符为\x01
,因此,如果待导入数据文件是 Hive 文件,这里必须传入\\x01
。说明
StarRocks 支持设置长度最大不超过 50 个字节的 UTF-8 编码字符串作为列分隔符,包括常见的逗号 (,)、Tab 和 Pipe (|)。
-
column_list
用于指定待导入数据文件和 StarRocks 表之间的列对应关系。语法如下:
(<column_name>[, <column_name> ...])
。column_list
中声明的列与 StarRocks 表中的列按名称一一对应。说明
如果待导入数据文件的列和 StarRocks 表中的列按顺序一一对应,则不需要指定
column_list
参数。如果要跳过待导入数据文件中的某一列,只需要在
column_list
参数中将该列命名为 StarRocks 表中不存在的列名即可。具体请参见导入过程中实现数据转换。 -
COLUMNS FROM PATH AS
用于从指定的文件路径中提取一个或多个分区字段的信息。该参数仅当指定的文件路径中存在分区字段时有效。
例如,待导入数据文件所在的路径为
/path/col_name=col_value/file1
,其中col_name
可以对应到 StarRocks 表中的列。这时候,您可以设置参数为col_name
。导入时,StarRocks 会将col_value
落入col_name
对应的列中。说明
该参数只有在从 HDFS 导入数据时可用。
-
SET
用于将待导入数据文件的某一列按照指定的函数进行转化,然后将转化后的结果落入 StarRocks 表中。语法如下:
column_name = expression
。以下为两个示例:- StarRocks 表中有三列,按顺序依次为
col1
、col2
和col3
;待导入数据文件中有四列,前两列按顺序依次对应 StarRocks 表中的col1
、col2
列,后两列之和对应 StarRocks 表中的col3
列。这种情况下,需要通过column_list
参数声明(col1,col2,tmp_col3,tmp_col4)
,并使用 SET 子句指定SET (col3=tmp_col3+tmp_col4)
来实现数据转换。 - StarRocks 表中有三列,按顺序依次为
year
、month
和day
;待导入数据文件中只有一个包含时间数据的列,格式为yyyy-mm-dd hh:mm:ss
。这种情况下,需要通过column_list
参数声明(tmp_time)
、并使用 SET 子句指定SET (year = year(tmp_time), month=month(tmp_time), day=day(tmp_time))
来实现数据转换。
- StarRocks 表中有三列,按顺序依次为
-
WHERE
用于指定过滤条件,对做完转换的数据进行过滤。只有符合 WHERE 子句中指定的过滤条件的数据才会导入到 StarRocks 表中。
在 StarRocks v2.4 及以前版本,用于指定 Broker 的名称,格式为 WITH BROKER "<broker_name>"。自 StarRocks v2.5 起,只保留 WITH BROKER 关键字,不再需要提供 broker_name。
用于提供访问数据源的鉴权信息。数据源不同,需要提供的鉴权信息也不同。
社区版本的 HDFS,支持简单认证和 Kerberos 认证两种认证方式(Broker Load 默认使用简单认证),并且支持 NameNode 节点的 HA 配置。如果数据源为社区版本的 HDFS,可以提供如下配置信息:
-
认证方式
-
如果使用简单认证,需要指定如下配置:
"hadoop.security.authentication" = "simple" "username" = "<hdfs_username>", "password" = "<hdfs_password>"
上述配置中的参数说明如下表所述。
参数名称 参数说明 username 用于访问 HDFS 集群中 NameNode 节点的用户名。 password 用于访问 HDFS 集群中 NameNode 节点的密码。 -
如果使用 Kerberos 认证,需要指定如下配置:
"hadoop.security.authentication" = "kerberos", "kerberos_principal = "nn/[email protected]", "kerberos_keytab = "/keytab/hive.keytab", "kerberos_keytab_content = "YWFhYWFh"
上述配置中的参数说明如下表所述。
参数名称 参数说明 kerberos_principal 用于指定 Kerberos 的用户或服务 (Principal)。每个 Principal 在 HDFS 集群内唯一,由如下三部分组成: username
或servicename
:HDFS 集群中用户或服务的名称。instance
:HDFS 集群要认证的节点所在服务器的名称,用来保证用户或服务全局唯一。比如,HDFS 集群中有多个 DataNode 节点,各节点需要各自独立认证。realm
:域,必须全大写。
nn/[email protected]
。kerberos_keytab 用于指定 Kerberos 的 Key Table(简称为“keytab”)文件的路径。 kerberos_keytab_content 用于指定 Kerberos 中 keytab 文件的内容经过 Base64 编码之后的内容。该参数跟 kerberos_keytab
参数二选一配置。
使用 Kerberos 认证时,需要打开 Broker 进程的启动脚本文件 start_broker.sh,在文件 42 行附近修改如下信息让 Broker 进程读取 krb5.conf 文件信息:
export JAVA_OPTS="-Dlog4j2.formatMsgNoLookups=true -Xmx1024m -Dfile.encoding=UTF-8 -Djava.security.krb5.conf=/etc/krb5.conf"
说明
/etc/krb5.conf 文件路径根据实际情况进行修改,Broker 进程需要有权限读取该文件。部署多组 Broker 时,每组 Broker 均需要修改如下信息,重启后生效。
-
-
HA 配置
可以为 HDFS 集群中的 NameNode 节点配置 HA 机制,从而确保发生 NameNode 节点切换时,StarRocks 能够自动识别新切换到的 NameNode 节点。
目前 Broker 节点支持使用如下两种方式读取 HDFS 集群中节点的信息:-
将
hdfs-site.xml
文件放在 Broker 所在的每个节点的{deploy}/conf
目录下。Broker 进程重启时,会将{deploy_dir}/conf/
目录添加到CLASSPATH
环境变量的方式读取文件信息。 -
在创建 Broker Load 作业时增加如下 HA 配置:
"dfs.nameservices" = "ha_cluster", "dfs.ha.namenodes.ha_cluster" = "ha_n1,ha_n2", "dfs.namenode.rpc-address.ha_cluster.ha_n1" = "<hdfs_host>:<hdfs_port>", "dfs.namenode.rpc-address.ha_cluster.ha_n2" = "<hdfs_host>:<hdfs_port>", "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
上述配置中的参数说明如下表所述:
参数名称 参数说明 dfs.nameservices 自定义 HDFS 集群的名称。 dfs.ha.namenodes.XXX 自定义 NameNode 的名称,多个名称以逗号 (,) 分隔,双引号内不允许出现空格。
其中xxx
为dfs.nameservices
中自定义的HDFS 服务的名称。dfs.namenode.rpc-address.XXX.NN 指定 NameNode 的 RPC 地址信息。
其中NN
表示dfs.ha.namenodes.XXX
中自定义 NameNode 的名称。dfs.client.failover.proxy.provider 指定客户端连接的 NameNode 的提供者,默认为 org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
。 -
如果数据源为 Amazon S3,需要提供如下配置信息。
参数名称 | 参数说明 |
---|---|
fs.s3a.access.key | 访问 Amazon S3 存储空间的 Access Key ID。 |
fs.s3a.secret.key | 访问 Amazon S3 存储空间的 Secret Access Key。 |
fs.s3a.endpoint | 访问 Amazon S3 存储空间的连接地址。 |
请参见 AWS 官方文档访问密钥。
说明
- 从 Amazon S3 导入数据使用的是 S3A 协议,因此文件路径的前缀必须为
s3a://
。- 如果您的 Amazon EC2 实例上绑定的 IAM 角色可以访问您的 Amazon S3 存储空间,那么您不需要提供
fs.s3a.access.key
和fs.s3a.secret.key
配置,留空即可。
如果数据源为 Google CGS,需要提供如下配置信息。
参数名称 | 参数说明 |
---|---|
fs.s3a.access.key | 访问 Google CGS 存储空间的 Access Key。 |
fs.s3a.secret.key | 访问 Google CGS 存储空间的 Secret Key。 |
fs.s3a.endpoint | 访问 Google CGS 存储空间的连接地址。 |
创建访问 Google CGS 存储空间的密钥对的操作步骤如下:
-
登录 Google GCP。
-
在左侧导航栏,选择 Google Cloud Storage,然后选择 Settings。
-
选择 Interoperability 页签。
如果还没有启用 Interoperability 特性,请单击 Interoperable Access。
-
单击 Create new Key 按钮,按界面提示完成密钥对的创建。
如果数据源为阿里云 OSS,需要提供如下配置信息。
参数名称 | 参数说明 |
---|---|
fs.oss.accessKeyId | 访问阿里云 OSS 存储空间的 AccessKey ID,用于标识用户。 |
fs.oss.accessKeySecret | 访问阿里云 OSS 存储空间的 AccessKey Secret,是用于加密签名字符串和 OSS 用来验证签名字符串的密钥。 |
fs.oss.endpoint | 访问阿里云 OSS 存储空间的连接地址。 |
请参见阿里云官方文档用户签名验证。
如果数据源为腾讯云 COS,需要提供如下配置信息。
参数名称 | 参数说明 |
---|---|
fs.cosn.userinfo.secretId | 访问腾讯云 COS 存储空间的 SecretId,用于标识 API 调用者的身份。 |
fs.cosn.userinfo.secretKey | 访问腾讯云 COS 存储空间的 SecretKey,是用于加密签名字符串和服务端验证签名字符串的密钥。 |
fs.cosn.bucket.endpoint_suffix | 访问腾讯云 COS 存储空间的连接地址。 |
请参见腾讯云官方文档使用永久密钥访问 COS。
用于指定一些导入相关的可选参数,指定的参数设置作用于整个导入作业。语法如下:
PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])
参数说明如下:
-
timeout
导入作业的超时时间。单位:秒。默认超时时间为 4 小时。建议超时时间小于 6 小时。如果导入作业在设定的时限内未完成,会自动取消,变成 CANCELLED 状态。
说明
通常情况下,您不需要手动设置导入作业的超时时间。只有当导入作业无法在默认的超时时间内完成时,才推荐您手动设置导入作业的超时时间。
推荐超时时间大于下面公式的计算值:
超时时间 > (待导入数据文件的总大小 x 待导入数据文件及相关物化视图的个数)/(平均导入速度 x 导入并发数)
说明
-
“平均导入速度”是指目前 StarRocks 集群的平均导入速度。由于每个 StarRocks 集群的机器环境不同、且集群允许的并发查询任务数也不同,因此,StarRocks 集群的平均导入速度需要根据历史导入速度进行推测。
-
“导入并发数”可以通过
max_broker_concurrency
参数设置,具体请参见“从 HDFS 或外部云存储系统导入数据”文档中的“作业拆分与并行执行”章节。
例如,要导入一个 1 GB 的数据文件,该数据文件包含 2 个物化视图,当前 StarRocks 集群的平均导入速度为 10 MB/s,导入并发数为 3。在这种情况下,根据公式计算出来时长为 102 秒:
(1 x 1024 x 3)/(10 x 3) = 102(秒)
因此,导入作业的超时时间应该大于 102 秒。
-
-
max_filter_ratio
导入作业的最大容忍率,即导入作业能够容忍的因数据质量不合格而过滤掉的数据行所占的最大比例。取值范围:
0
~1
。默认值:0
。- 如果设置最大容忍率为
0
,则 StarRocks 在导入过程中不会忽略错误的数据行。当导入的数据行中有错误时,导入作业会失败,从而保证数据的正确性。 - 如果设置最大容忍率大于
0
,则 StarRocks 在导入过程中会忽略错误的数据行。这样,即使导入的数据行中有错误,导入作业也能成功。说明
这里因数据质量不合格而过滤掉的数据行,不包括通过 WHERE 子句过滤掉的数据行。
如果因为设置最大容忍率为
0
而导致作业失败,可以通过 SHOW LOAD 语句来查看导入作业的结果信息。然后,判断错误的数据行是否可以被过滤掉。如果可以被过滤掉,则可以根据结果信息中的dpp.abnorm.ALL
和dpp.norm.ALL
来计算导入作业的最大容忍率,然后调整后重新提交导入作业。计算公式如下:max_filter_ratio
= [dpp.abnorm.ALL
/(dpp.abnorm.ALL
+dpp.norm.ALL
)]dpp.abnorm.ALL
和dpp.norm.ALL
的总和就等于待导入的总行数。 - 如果设置最大容忍率为
-
load_mem_limit
导入作业的内存限制,最大不超过 BE 的内存限制。单位:字节。默认内存限制为 2 GB。
-
strict_mode
是否开启严格模式。取值范围:
true
和false
。默认值:false
。true
表示开启,false
表示关闭。严格模式是指在导入过程中对列的数据类型转换进行严格过滤。
如果开启了严格模式,在转换失败的情况下,会返回错误。如果关闭了严格模式,在转换失败的情况下,会导入
NULL
。严格过滤的策略在如下场景不生效:
- 导入的某列由函数转换生成。
- 导入的某列数据类型包含范围限制,原始数据行能正常通过数据类型转换、但无法通过范围限制。例如,某列的数据类型是 DECIMAL(1,0)、原始数据行的列值为
10
。
-
timezone
指定导入作业所使用的时区。默认为
Asia/Shanghai
时区。该参数会影响所有导入涉及的、跟时区设置有关的函数所返回的结果。受时区影响的函数有 strftime、alignment_timestamp 和 from_unixtime 等,具体请参见设置时区。导入参数timezone
设置的时区对应“设置时区”中所述的会话级时区。 -
priority
指定导入作业的优先级。取值范围:
LOWEST
、LOW
、NORMAL
、HIGH
和HIGHEST
。默认值:NORMAL
。Broker Load 通过 FE 配置项async_load_task_pool_size
指定任务线程池的大小,即 StarRocks 集群中可以并行执行的 Broker Load 任务的最大数量。如果某一时间段内提交的 Broker Load 作业的任务总数超过最大数量,则超出的作业会按照优先级在队列中排队等待调度。已经创建成功的导入作业,如果处于 QUEUEING 状态或者 LOADING 状态,那么您可以使用 ALTER LOAD 语句修改该作业的优先级。
-
merge_condition
用于指定作为更新生效条件的列名。只有当导入的数据中该列的值大于当前值的时候,更新才会生效。参见通过导入实现数据变更。
说明
指定的列必须为非主键列,且仅主键模型表支持条件更新。
本文以 HDFS 数据源为例,介绍各种导入配置。
本小节以 CSV 格式的数据为例,重点阐述在创建导入作业的时候,如何运用各种参数配置来满足不同业务场景下的各种导入要求。
StarRocks 数据库 test_db
里的表 table1
包含三列,按顺序依次为 col1
、col2
、col3
。
数据文件 example1.csv
也包含三列,按顺序一一对应 table1
中的三列。
如果要把 example1.csv
中所有的数据都导入到 table1
中,并且要求超时时间最大不超过 3600 秒,可以执行如下语句:
LOAD LABEL test_db.label1
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example1.csv")
INTO TABLE table1
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
)
PROPERTIES
(
"timeout" = "3600"
);
StarRocks 数据库 test_db
里的表 table2
包含三列,按顺序依次为 col1
、col2
、col3
。
数据文件 example2.csv
也包含三列,按顺序一一对应 table2
中的三列。
如果要把 example2.csv
中所有的数据都导入到 table2
中,并且要求容错率最大不超过 0.1
,可以执行如下语句:
LOAD LABEL test_db.label2
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example2.csv")
INTO TABLE table2
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
)
PROPERTIES
(
"max_filter_ratio" = "0.1"
);
StarRocks 数据库 test_db
里的表 table3
包含三列,按顺序依次为 col1
、col2
、col3
。
HDFS 集群的 /user/starrocks/data/input/
路径下所有数据文件也包含三列,按顺序一一对应 table3
中的三列,并且列分隔符为 Hive 文件的默认列分隔符 \x01
。
如果要把 hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/
路径下所有数据文件的数据都导入到 table3
中,可以执行如下语句:
LOAD LABEL test_db.label3
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/*")
INTO TABLE table3
COLUMNS TERMINATED BY "\\x01"
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);
StarRocks 数据库 test_db
里的表 table4
包含三列,按顺序依次为 col1
、col2
、col3
。
数据文件 example4.csv
也包含三列,按顺序一一对应 table4
中的三列。
如果要把 example4.csv
中所有的数据都导入到 table4
中,并且要求使用 NameNode HA 机制,可以执行如下语句:
LOAD LABEL test_db.label4
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example4.csv")
INTO TABLE table4
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>",
"dfs.nameservices" = "my_ha",
"dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
"dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
"dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
StarRocks 数据库 test_db
里的表 table5
包含三列,按顺序依次为 col1
、col2
、col3
。
数据文件 example5.csv
也包含三列,按顺序一一对应 table5
中的三列。
如果要把 example5.csv
中所有的数据都导入到 table5
中,并且要求使用 Kerberos 认证方式、提供 keytab 文件的路径,可以执行如下语句:
LOAD LABEL test_db.label5
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/old_file")
NEGATIVE
INTO TABLE table5
COLUMNS TERMINATED BY "\t"
)
WITH BROKER
(
"hadoop.security.authentication" = "kerberos",
"kerberos_principal" = "[email protected]",
"kerberos_keytab" = "/home/starRocks/starRocks.keytab"
);
StarRocks 数据库 test_db
里的表 table6
包含三列,按顺序依次为 col1
、col2
、col3
。
数据文件 example6.csv
也包含三列,按顺序一一对应 table6
中的三列。
并且,您已经通过 Broker Load 把 example6.csv
中所有的数据都导入到了 table6
中。
如果您想撤销已导入的数据,可以执行如下语句:
LOAD LABEL test_db.label6
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example6.csv)
NEGATIVE
INTO TABLE table6
COLUMNS TERMINATED BY "\t"
)
WITH BROKER
(
"hadoop.security.authentication" = "kerberos",
"kerberos_principal" = "starrocks@YOUR.COM",
"kerberos_keytab" = "/home/starRocks/starRocks.keytab"
);
StarRocks 数据库 test_db
里的表 table7
包含三列,按顺序依次为 col1
、col2
、col3
。
数据文件 example7.csv
也包含三列,按顺序一一对应 table7
中的三列。
如果要把 example7.csv
中所有的数据都导入到 table7
所在的分区 p1
和 p2
,可以执行如下语句:
LOAD LABEL test_db.label7
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example7.csv")
INTO TABLE table7
PARTITION (p1, p2)
COLUMNS TERMINATED BY ","
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);
StarRocks 数据库 test_db
里的表 table8
包含三列,按顺序依次为 col1
、col2
、col3
。
数据文件 example8.csv
也包含三列,按顺序依次对应 table8
中 col2
、col1
、col3
。
如果要把 example3.csv
中所有的数据都导入到 table3
中,可以执行如下语句:
LOAD LABEL test_db.label8
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example8.csv")
INTO TABLE table8
COLUMNS TERMINATED BY ","
(col2, col1, col3)
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);
说明
上述示例中,因为
example8.csv
和table8
所包含的列不能按顺序依次对应,因此需要通过column_list
参数来设置example8.csv
和table8
之间的列映射关系。
StarRocks 数据库 test_db
里的表 table9
包含三列,按顺序依次为 col1
、col2
、col3
。
数据文件 example9.csv
也包含三列,按顺序一一对应 table9
中的三列。
如果只想把 example9.csv
中第一列的值大于 20180601
的数据行导入到 table9
中,可以执行如下语句:
LOAD LABEL test_db.label9
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example9.csv")
INTO TABLE table9
(col1, col2, col3)
where col1 > 20180601
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);
说明
上述示例中,虽然
example9.csv
和table9
所包含的列数目相同、并且按顺序一一对应,但是因为需要通过 WHERE 子句指定基于列的过滤条件,因此需要通过column_list
参数对example9.csv
中的列进行临时命名。
StarRocks 数据库 test_db
里的表 table10
包含四列,按顺序依次为 id
、col1
、col2
、col3
,其中 col1
和 col2
是 HLL 类型的列。
数据文件 example10.csv
包含三列,第一列对应 table10
中的 id
列;第二列和第三列分别对应 table10
中 HLL 类型的列 col1
和 col2
,可以通过函数转换成 HLL 类型的数据并分别落入 col1
、col2
列。
如果要把 example10.csv
中所有的数据都导入到 table10
中,可以执行如下语句:
LOAD LABEL test_db.label10
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example10.csv")
INTO TABLE table10
COLUMNS TERMINATED BY ","
(id, temp1, temp2)
SET
(
col1 = hll_hash(temp1),
col2 = hll_hash(temp2),
col3 = empty_hll()
)
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);
说明
上述示例中,通过
column_list
参数,把example10.csv
中的三列按顺序依次临时命名为id
、temp1
、temp2
,然后使用函数指定数据转换规则,包括:
使用
hll_hash
函数把example10.csv
中的temp1
、temp2
列转换成 HLL 类型的数据,并分别落入table10
中的col1
、col2
列。使用
empty_hll
函数给导入的数据行在table10
中的第四列补充默认值。
有关 hll_hash
函数和 hll_empty
函数的用法,请参见 HLL。
Broker Load 支持根据 StarRocks 表中定义的字段类型来解析待导入文件路径中的分区字段,类似 Apache Spark™ 中的分区发现 (Partition Discovery) 功能。
StarRocks 数据库 test_db
里的表 table11
包含五列,按顺序依次为 col1
、col2
、col3
、city
、utc_date
。
HDFS 集群的 /user/starrocks/data/input/dir/city=beijing
路径下包含如下数据文件:
-
/user/starrocks/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv
-
/user/starrocks/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv
这些数据文件都包含三列,按顺序分别对应 table11
中 col1
、col2
、col3
三列。
如果要把 /user/starrocks/data/input/dir/city=beijing/utc_date=*/*
路径下所有数据文件的数据都导入到 table11
中,并且要求提取路径中分区字段 city
和 utc_date
的信息落入 table11
中对应的 city
、utc_date
列,可以执行如下语句:
LOAD LABEL test_db.label11
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/dir/city=beijing/*/*")
INTO TABLE table11
FORMAT AS "csv"
(col1, col2, col3)
COLUMNS FROM PATH AS (city, utc_date)
SET (uniq_id = md5sum(k1, city))
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);
在 HDFS 的文件路径中,不允许有冒号 (:),所有冒号 (:) 都会自动替换成 %3A
。
StarRocks 数据库 test_db
里的表 table12
包含三列,按顺序依次为 data_time
、col1
、col2
,表结构如下:
data_time DATETIME,
col1 INT,
col2 INT
HDFS 集群的 /user/starrocks/data
路径下有如下数据文件:
-
/user/starrocks/data/data_time=2020-02-17 00%3A00%3A00/example12.csv
-
/user/starrocks/data/data_time=2020-02-18 00%3A00%3A00/example12.csv
如果要把 example12.csv
中所有的数据都导入到 table12
中,并且要求提取指定路径中分区字段 data_time
的信息落入到 table12
中的 data_time
列,可以执行如下语句:
LOAD LABEL test_db.label12
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/*/example12.csv")
INTO TABLE table12
COLUMNS TERMINATED BY ","
FORMAT AS "csv"
(col1,col2)
COLUMNS FROM PATH AS (data_time)
SET (data_time = str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s'))
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);
上述示例中,因为直接提取后的分区字段 data_time
是包含 %3A
的字符串(如 2020-02-17 00%3A00%3A00
),因此需要再通过 str_to_date
函数把字符串转换为 DATETIME 类型的数据以后才能落入 table8
中的 data_time
列。
本小节主要描述导入 Parquet 格式的数据时,需要关注的一些参数配置。
StarRocks 数据库 test_db
里的表 table13
包含三列,按顺序依次为 col1
、col2
、col3
。
数据文件 example13.parquet
也包含三列,按顺序一一对应 table13
中的三列。
如果要把 example13.parquet
中所有的数据都导入到 table13
中,可以执行如下语句:
LOAD LABEL test_db.label13
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example13.parquet")
INTO TABLE table13
FORMAT AS "parquet"
(col1, col2, col3)
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);
说明
导入 Parquet 格式的数据时,默认通过文件扩展名 (.parquet) 判断数据文件的格式。如果文件名称中没有包含扩展名,则必须通过
FORMAT AS
参数指定数据文件格式为Parquet
。
本小节主要描述导入 ORC 格式的数据时,需要关注的一些参数配置。
StarRocks 数据库 test_db
里的表 table14
包含三列,按顺序依次为 col1
、col2
、col3
。
数据文件 example14.orc
也包含三列,按顺序一一对应 table14
中的三列。
如果要把 example14.orc
中所有的数据都导入到 table14
中,可以执行如下语句:
LOAD LABEL test_db.label14
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example14.orc")
INTO TABLE table14
FORMAT AS "orc"
(col1, col2, col3)
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);
说明
导入 ORC 格式的数据时,默认通过文件扩展名 (.orc) 判断数据文件的格式。如果文件名称中没有包含扩展名,则必须通过
FORMAT AS
参数指定数据文件格式为ORC
。StarRocks v2.3 及之前版本,当数据文件中包含 ARRAY 类型的列时,必须确保数据文件和 StarRocks 表中对应的列同名,并且不能写在 SET 子句里。