StarRocks 提供 Apache Spark™ 连接器 (StarRocks Connector for Apache Spark™),支持通过 Spark 读取 StarRocks 中存储的数据。您可以使用 Spark 对读取到的数据进行复杂处理、机器学习等。
Spark 连接器支持三种数据读取方式:临时视图、Spark DataFrame 和 Spark RDD。
您可以使用 Spark SQL 在 StarRocks 表上创建临时视图,然后通过临时视图直接读取 StarRocks 表的数据。
您也可以将 StarRocks 表映射为 Spark DataFrame 或者 Spark RDD,然后从 Spark DataFrame 或者 Spark RDD 中读取数据。推荐使用 Spark DataFrame 来读取 StarRocks 中存储的数据。
-
当前版本只支持从 StarRocks 中读取数据,不支持从 Sink 写入数据到 StarRocks 中。
-
支持在 StarRocks 端完成数据过滤,从而减少数据传输量。
-
如果读取数据的开销比较大,可以通过合理的表设计和使用过滤条件,控制 Spark不要一次读取过多的数据,从而避免给磁盘和网络造成过大的 I/O 压力或影响正常的查询业务。
Spark 连接器 | Spark | StarRocks | Java | Scala |
---|---|---|---|---|
v1.0.0 | v2.x | v1.18 及以上 | v8 | v2.11 |
v1.0.0 | v3.x | v1.18 及以上 | v8 | v2.12 |
已部署 Spark。
-
下载 Spark 连接器编译包。
-
通过如下命令进行 Spark 连接器的编译:
-
如果 Spark 版本是 v2.x,则执行如下命令,默认编译的是配套 Spark v2.3.4 的连接器:
sh build.sh 2
-
如果 Spark 版本是 v3.x,则执行如下命令,默认编译的是配套 Spark v3.1.2 的连接器:
sh build.sh 3
-
-
编译完成后,
output/
路径下生成starrocks-spark2_2.11-1.0.0.jar
文件。将该文件拷贝至 Spark 的类文件路径 (Classpath) 下:- 如果您的 Spark 以
Local
模式运行,需要把该文件放在jars/
路径下。 - 如果您的 Spark 以
Yarn
模式运行,需要把该文件放在预安装程序包 (Pre-deployment Package) 里。
- 如果您的 Spark 以
把文件放置到指定位置后,才可以开始使用 Spark 连接器读取数据。
本小节描述您在使用 Spark 连接器读取数据的过程中需要配置的参数。
以下参数适用于 Spark SQL、Spark DataFrame、Spark RDD 三种读取方式。
参数名称 | 默认值 | 说明 |
---|---|---|
starrocks.fenodes | 无 | StarRocks 集群中 FE 的 HTTP 地址,格式为 <fe_host>:<fe_http_port> 。支持输入多个地址,使用逗号 (,) 分隔。 |
starrocks.table.identifier | 无 | StarRocks 表的名称,格式为 <database_name>.<table_name> . |
starrocks.request.retries | 3 | 向 StarRocks 发送请求的重试次数。 |
starrocks.request.connect.timeout.ms | 30000 | 向 StarRocks 发送请求的连接超时时间。 |
starrocks.request.read.timeout.ms | 30000 | 向 StarRocks 发送请求的读取超时时间。 |
starrocks.request.query.timeout.s | 3600 | 从 StarRocks 查询数据的超时时间。默认超时时间为 1 小时。-1 表示无超时限制。 |
starrocks.request.tablet.size | Integer.MAX_VALUE | 一个 Spark RDD 分区对应的 StarRocks Tablet 的个数。参数设置越小,生成的分区越多,Spark 侧的并行度也就越大,但与此同时会给 StarRocks 侧造成更大的压力。 |
starrocks.batch.size | 4096 | 单次从 BE 读取的最大行数。调大参数取值可减少 Spark 与 StarRocks 之间建立连接的次数,从而减轻网络延迟所带来的的额外时间开销。对于StarRocks 2.2及以后版本最小支持的batch size为4096,如果配置小于该值,则按4096处理 |
starrocks.exec.mem.limit | 2147483648 | 单个查询的内存限制。单位:字节。默认内存限制为 2 GB。 |
starrocks.deserialize.arrow.async | false | 是否支持把 Arrow 格式异步转换为 Spark 连接器迭代所需的 RowBatch。 |
starrocks.deserialize.queue.size | 64 | 异步转换 Arrow 格式的内部处理队列,当 starrocks.deserialize.arrow.async 为 true 时生效。 |
starrocks.filter.query | 无 | 指定过滤条件。多个过滤条件用 and 连接。StarRocks 根据指定的过滤条件完成对待读取数据的过滤。 |
以下参数仅适用于 Spark SQL 和 Spark DataFrame 读取方式。
参数名称 | 默认值 | 说明 |
---|---|---|
user | 无 | StarRocks 集群账号的用户名。 |
password | 无 | StarRocks 集群账号的用户密码。 |
starrocks.filter.query.in.max.count | 100 | 谓词下推中,IN 表达式支持的取值数量上限。如果 IN 表达式中指定的取值数量超过该上限,则 IN 表达式中指定的条件过滤在 Spark 侧处理。 |
以下参数仅适用于 Spark RDD 读取方式。
参数名称 | 默认值 | 说明 |
---|---|---|
starrocks.request.auth.user | 无 | StarRocks 集群账号的用户名。 |
starrocks.request.auth.password | 无 | StarRocks 集群账号的用户密码。 |
starrocks.read.field | 无 | 指定从 StarRocks 表中读取哪些列的数据。多个列名之间使用逗号 (,) 分隔。 |
StarRocks 数据类型 | Spark 数据类型 |
---|---|
BOOLEAN | DataTypes.BooleanType |
TINYINT | DataTypes.ByteType |
SMALLINT | DataTypes.ShortType |
INT | DataTypes.IntegerType |
BIGINT | DataTypes.LongType |
LARGEINT | DataTypes.StringType |
FLOAT | DataTypes.FloatType |
DOUBLE | DataTypes.DoubleType |
DECIMAL | DecimalType |
DATE | DataTypes.StringType |
DATETIME | DataTypes.StringType |
CHAR | DataTypes.StringType |
VARCHAR | DataTypes.StringType |
ARRAY | Unsupported datatype |
HLL | Unsupported datatype |
BITMAP | Unsupported datatype |
Spark 连接器中,将 DATE 和 DATETIME 数据类型映射为 STRING 数据类型。因为 StarRocks 底层存储引擎处理逻辑,直接使用 DATE 和 DATETIME 数据类型时,覆盖的时间范围无法满足需求。所以,使用 STRING 数据类型直接返回对应的时间可读文本。
假设您的 StarRocks 集群中已创建数据库 test
,并且您拥有 root 账号权限。
执行如下步骤,准备数据样例:
-
进入
test
数据库,创建一张名为score_board
的表。MySQL [test]> CREATE TABLE `score_board` ( `id` int(11) NOT NULL COMMENT "", `name` varchar(65533) NULL DEFAULT "" COMMENT "", `score` int(11) NOT NULL DEFAULT "0" COMMENT "" ) ENGINE=OLAP PRIMARY KEY(`id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
-
向
score_board
表中插入数据。MySQL [test]> INSERT INTO score_board values (1, 'Bob', 21), (2, 'Stan', 21), (3, 'Sam', 22), (4, 'Tony', 22), (5, 'Alice', 22), (6, 'Lucy', 23), (7, 'Polly', 23), (8, 'Tom', 23), (9, 'Rose', 24), (10, 'Jerry', 24), (11, 'Jason', 24), (12, 'Lily', 25), (13, 'Stephen', 25), (14, 'David', 25), (15, 'Eddie', 26), (16, 'Kate', 27), (17, 'Cathy', 27), (18, 'Judy', 27), (19, 'Julia', 28), (20, 'Robert', 28), (21, 'Jack', 29);
-
查询
score_board
表的数据。MySQL [test]> SELECT * FROM score_board; +------+---------+-------+ | id | name | score | +------+---------+-------+ | 1 | Bob | 21 | | 2 | Stan | 21 | | 3 | Sam | 22 | | 4 | Tony | 22 | | 5 | Alice | 22 | | 6 | Lucy | 23 | | 7 | Polly | 23 | | 8 | Tom | 23 | | 9 | Rose | 24 | | 10 | Jerry | 24 | | 11 | Jason | 24 | | 12 | Lily | 25 | | 13 | Stephen | 25 | | 14 | David | 25 | | 15 | Eddie | 26 | | 16 | Kate | 27 | | 17 | Cathy | 27 | | 18 | Judy | 27 | | 19 | Julia | 28 | | 20 | Robert | 28 | | 21 | Jack | 29 | +------+---------+-------+ 21 rows in set (0.01 sec)
-
进入 Spark 的可执行程序目录,运行如下命令:
sh spark-sql
-
在数据库
test
中的表score_board
上创建一个名为spark_starrocks
的临时视图:spark-sql> CREATE TEMPORARY VIEW spark_starrocks USING starrocks OPTIONS ( "starrocks.table.identifier" = "test.score_board", "starrocks.fenodes" = "<fe_host>:<fe_http_port>", "user" = "root", "password" = "" );
-
从临时视图中读取数据:
spark-sql> SELECT * FROM spark_starrocks;
返回数据如下:
1 Bob 21 2 Stan 21 3 Sam 22 4 Tony 22 5 Alice 22 6 Lucy 23 7 Polly 23 8 Tom 23 9 Rose 24 10 Jerry 24 11 Jason 24 12 Lily 25 13 Stephen 25 14 David 25 15 Eddie 26 16 Kate 27 17 Cathy 27 18 Judy 27 19 Julia 28 20 Robert 28 21 Jack 29 Time taken: 1.883 seconds, Fetched 21 row(s) 22/08/09 15:29:36 INFO thriftserver.SparkSQLCLIDriver: Time taken: 1.883 seconds, Fetched 21 row(s)
-
进入 Spark 的可执行程序目录,运行如下命令:
sh spark-shell
-
运行如下命令,在 Spark 中根据 StarRocks 数据库
test
中的表score_board
创建一个名为starrocksSparkDF
的 DataFrame:scala> val starrocksSparkDF = spark.read.format("starrocks") .option("starrocks.table.identifier", s"test.score_board") .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>") .option("user", s"root") .option("password", s"") .load()
-
从 DataFrame 中读取数据。例如,如果要读取前 10 行数据,需要运行如下命令:
scala> starrocksSparkDF.show(10)
返回数据如下:
+---+-----+-----+ | id| name|score| +---+-----+-----+ | 1| Bob| 21| | 2| Stan| 21| | 3| Sam| 22| | 4| Tony| 22| | 5|Alice| 22| | 6| Lucy| 23| | 7|Polly| 23| | 8| Tom| 23| | 9| Rose| 24| | 10|Jerry| 24| +---+-----+-----+ only showing top 10 rows
说明
如果不指定要读取的行数,则默认读取前 20 行数据。
-
进入 Spark 的可执行程序目录,运行如下命令:
sh spark-shell
-
运行如下命令,在 Spark 中根据 StarRocks 数据库
test
中的表score_board
创建一个名为starrocksSparkRDD
的 RDD:scala> import com.starrocks.connector.spark._ scala> val starrocksSparkRDD = sc.starrocksRDD ( tableIdentifier = Some("test.score_board"), cfg = Some(Map( "starrocks.fenodes" -> "<fe_host>:<fe_http_port>", "starrocks.request.auth.user" -> "root", "starrocks.request.auth.password" -> "" )) )
-
从 RDD 中读取数据。例如,如果要读取前 10 个 元素 (Element) 的数据,需要运行如下命令:
scala> starrocksSparkRDD.take(10)
返回数据如下:
res0: Array[AnyRef] = Array([1, Bob, 21], [2, Stan, 21], [3, Sam, 22], [4, Tony, 22], [5, Alice, 22], [6, Lucy, 23], [7, Polly, 23], [8, Tom, 23], [9, Rose, 24], [10, Jerry, 24])
如果要读取整个 RDD,需要运行如下命令:
scala> starrocksSparkRDD.collect()
返回数据如下:
res1: Array[AnyRef] = Array([1, Bob, 21], [2, Stan, 21], [3, Sam, 22], [4, Tony, 22], [5, Alice, 22], [6, Lucy, 23], [7, Polly, 23], [8, Tom, 23], [9, Rose, 24], [10, Jerry, 24], [11, Jason, 24], [12, Lily, 25], [13, Stephen, 25], [14, David, 25], [15, Eddie, 26], [16, Kate, 27], [17, Cathy, 27], [18, Judy, 27], [19, Julia, 28], [20, Robert, 28], [21, Jack, 29])
使用 Spark 连接器从 StarRocks 读取数据的时候,可以通过 starrocks.filter.query
参数指定过滤条件来做合理的分区、分桶、前缀索引裁剪,减少拉取数据的开销。这里以 Spark DataFrame 为例进行介绍,通过查看执行计划来验证实际数据裁剪的效果。
组件 | 版本 |
---|---|
Spark | Spark v2.4.4 和 Scala v2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_302) |
StarRocks | v2.2.0 |
Spark Connector | starrocks-spark2_2.11-1.0.0.jar |
执行如下步骤,准备数据样例:
-
进入
test
数据库,创建一张名为mytable
的表。MySQL [test]> CREATE TABLE `mytable` ( `k` int(11) NULL COMMENT "bucket", `b` int(11) NULL COMMENT "", `dt` datetime NULL COMMENT "", `v` int(11) NULL COMMENT "" ) ENGINE=OLAP DUPLICATE KEY(`k`,`b`, `dt`) COMMENT "OLAP" PARTITION BY RANGE(`dt`) ( PARTITION p202201 VALUES [('2022-01-01 00:00:00'), ('2022-02-01 00:00:00')), PARTITION p202202 VALUES [('2022-02-01 00:00:00'), ('2022-03-01 00:00:00')), PARTITION p202203 VALUES [('2022-03-01 00:00:00'), ('2022-04-01 00:00:00')) ) DISTRIBUTED BY HASH(`k`) BUCKETS 3 PROPERTIES ( "replication_num" = "1" );
-
向
mytable
表中插入数据。MySQL [test]> INSERT INTO mytable values (1, 11, '2022-01-02 08:00:00', 111), (2, 22, '2022-02-02 08:00:00', 222), (3, 33, '2022-03-02 08:00:00', 333);
-
查询
mytable
表的数据。MySQL [test]> select * from mytable; +------+------+---------------------+------+ | k | b | dt | v | +------+------+---------------------+------+ | 1 | 11 | 2022-01-02 08:00:00 | 111 | | 2 | 22 | 2022-02-02 08:00:00 | 222 | | 3 | 33 | 2022-03-02 08:00:00 | 333 | +------+------+---------------------+------+ 3 rows in set (0.01 sec)
-
在 Spark 可执行程序目录下,通过如下命令,根据数据库
test
中的表mytable
创建一个名为df
的 DataFrame:scala> val df = spark.read.format("starrocks") .option("starrocks.table.identifier", s"test.mytable") .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>") .option("user", s"root") .option("password", s"") .load()
-
查看 StarRocks 的 FE 日志文件 fe.log,找到 Spark 读取数据所用的 SQL 语句,如下所示:
2022-08-09 18:57:38,091 INFO (nioEventLoopGroup-3-10|196) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable`] from external service [ user ['root'@'%']] for database [test] table [mytable]
-
在 StarRocks 数据库下,使用 EXPLAIN 来获取 SELECT
k
,b
,dt
,v
fromtest
.mytable
语句的执行计划,如下所示:MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable`; +-----------------------------------------------------------------------+ | Explain String | +-----------------------------------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 1:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 01 | | UNPARTITIONED | | | | 0:OlapScanNode | | TABLE: mytable | | PREAGGREGATION: ON | | partitions=3/3 | | rollup: mytable | | tabletRatio=9/9 | | tabletList=41297,41299,41301,41303,41305,41307,41309,41311,41313 | | cardinality=3 | | avgRowSize=4.0 | | numNodes=0 | +-----------------------------------------------------------------------+ 26 rows in set (0.00 sec)
这里未作剪裁。因此,会扫描所有包含数据的 3 个分区 (partitions=3/3
)、以及这个三个分区中所有的 9 个 Tablet (tabletRatio=9/9
)。
-
在 Spark 可执行程序目录下,通过如下命令,根据数据库
test
中的表mytable
创建一个名为df
的 DataFrame,命令中使用starrocks.filter.query
参数指定过滤条件为dt='2022-0``1``-02 08:00:00
,以做分区剪裁:scala> val df = spark.read.format("starrocks") .option("starrocks.table.identifier", s"test.mytable") .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>") .option("user", s"root") .option("password", s"") .option("starrocks.filter.query", "dt='2022-01-02 08:00:00'") .load()
-
查看 StarRocks 的 FE 日志文件 fe.log,找到 Spark 读取数据所用的 SQL 语句,如下所示:
2022-08-09 19:02:31,253 INFO (nioEventLoopGroup-3-14|204) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where dt='2022-01-02 08:00:00'] from external service [ user ['root'@'%']] for database [test] table [mytable]
-
在 StarRocks 数据库下,使用 EXPLAIN 来获取 SELECT
k
,b
,dt
,v
fromtest
.mytable
where dt='2022-01-02 08:00:00' 语句的执行计划,如下所示:MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where dt='2022-01-02 08:00:00'; +------------------------------------------------+ | Explain String | +------------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 1:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 01 | | UNPARTITIONED | | | | 0:OlapScanNode | | TABLE: mytable | | PREAGGREGATION: ON | | PREDICATES: 3: dt = '2022-01-02 08:00:00' | | partitions=1/3 | | rollup: mytable | | tabletRatio=3/3 | | tabletList=41297,41299,41301 | | cardinality=1 | | avgRowSize=20.0 | | numNodes=0 | +------------------------------------------------+ 27 rows in set (0.01 sec)
这里只做了分区剪裁,未做分桶剪裁。因此,会扫描 3 个分区里的 1 个分区 (partitions=1/3
)、以及该分区下的所有 Tablet (tabletRatio=3/3
)。
-
在 Spark 可执行程序目录下,通过如下命令,根据数据库
test
中的表mytable
创建一个名为df
的 DataFrame,命令中使用starrocks.filter.query
参数指定过滤条件为k=1
,以做分桶剪裁:scala> val df = spark.read.format("starrocks") .option("starrocks.table.identifier", s"test.mytable") .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>") .option("user", s"root") .option("password", s"") .option("starrocks.filter.query", "k=1") .load()
-
查看 StarRocks 的 FE 日志文件 fe.log,找到 Spark 读取数据所用的 SQL 语句,如下所示:
2022-08-09 19:04:44,479 INFO (nioEventLoopGroup-3-16|208) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=1] from external service [ user ['root'@'%']] for database [test] table [mytable]
-
在 StarRocks 数据库下,使用 EXPLAIN 来获取 SELECT
k
,b
,dt
,v
fromtest
.mytable
where k=1 语句的执行计划,如下所示:MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=1; +------------------------------------------+ | Explain String | +------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 1:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 01 | | UNPARTITIONED | | | | 0:OlapScanNode | | TABLE: mytable | | PREAGGREGATION: ON | | PREDICATES: 1: k = 1 | | partitions=3/3 | | rollup: mytable | | tabletRatio=3/9 | | tabletList=41299,41305,41311 | | cardinality=1 | | avgRowSize=20.0 | | numNodes=0 | +------------------------------------------+ 27 rows in set (0.01 sec)
这里未做分区剪裁,只做了分桶裁剪。因此,会扫描所有包含数据的 3 个分区 (partitions=3/3
)、以及这 3 个分区下符合 k = 1
的 Hash 值的所有 3 个 Tablet (tabletRatio=3/9
)。
-
在 Spark 可执行程序目录下,通过如下命令,根据数据库
test
中的表mytable
创建一个名为df
的 DataFrame,命令中使用starrocks.filter.query
参数指定过滤条件为k=7
和dt='2022-0``1``-02 08:00:00'
,以做分区、分桶剪裁:scala> val df = spark.read.format("starrocks") .option("starrocks.table.identifier", s"test.mytable") .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>") .option("user", s"") .option("password", s"") .option("starrocks.filter.query", "k=7 and dt='2022-01-02 08:00:00'") .load()
-
查看 StarRocks 的 FE 日志文件 fe.log,找到 Spark 读取数据所用的 SQL 语句,如下所示:
2022-08-09 19:06:34,939 INFO (nioEventLoopGroup-3-18|212) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=7 and dt='2022-01-02 08:00:00'] from external service [ user ['root'@'%']] for database [test] t able [mytable]
-
在 StarRocks 数据库下,使用 EXPLAIN 来获取 SELECT
k
,b
,dt
,v
fromtest
.mytable
where k=7 and dt='2022-01-02 08:00:00' 语句的执行计划,如下所示:MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=7 and dt='2022-01-02 08:00:00'; +----------------------------------------------------------+ | Explain String | +----------------------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v | | PARTITION: RANDOM | | | | RESULT SINK | | | | 0:OlapScanNode | | TABLE: mytable | | PREAGGREGATION: ON | | PREDICATES: 1: k = 7, 3: dt = '2022-01-02 08:00:00' | | partitions=1/3 | | rollup: mytable | | tabletRatio=1/3 | | tabletList=41301 | | cardinality=1 | | avgRowSize=20.0 | | numNodes=0 | +----------------------------------------------------------+ 17 rows in set (0.00 sec)
这里同时做了分区剪裁和分桶裁剪。因此,只会扫描 3 个分区中的 1 个分区 (partitions=1/3
)、以及该分区下的 1 个 Tablet (tabletRatio=1/3
)。
-
往 1 个分区插入更多数据,如下所示:
MySQL [test]> INSERT INTO mytable values (1, 11, "2022-01-02 08:00:00", 111), (3, 33, "2022-01-02 08:00:00", 333), (3, 33, "2022-01-02 08:00:00", 333), (3, 33, "2022-01-02 08:00:00", 333);
-
查询
mytable
表的数据,如下所示:MySQL [test]> SELECT * FROM mytable; +------+------+---------------------+------+ | k | b | dt | v | +------+------+---------------------+------+ | 1 | 11 | 2022-01-02 08:00:00 | 111 | | 1 | 11 | 2022-01-02 08:00:00 | 111 | | 3 | 33 | 2022-01-02 08:00:00 | 333 | | 3 | 33 | 2022-01-02 08:00:00 | 333 | | 3 | 33 | 2022-01-02 08:00:00 | 333 | | 2 | 22 | 2022-02-02 08:00:00 | 222 | | 3 | 33 | 2022-03-02 08:00:00 | 333 | +------+------+---------------------+------+ 7 rows in set (0.01 sec)
-
在 Spark 可执行程序目录下,通过如下命令,根据数据库
test
中的表mytable
创建一个名为df
的 DataFrame,命令中使用starrocks.filter.query
参数指定过滤条件为k=1
,以做前缀索引过滤:scala> val df = spark.read.format("starrocks") .option("starrocks.table.identifier", s"test.mytable") .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>") .option("user", s"root") .option("password", s"") .option("starrocks.filter.query", "k=1") .load()
-
在 StarRocks 数据库下,开启 Profile 上报:
MySQL [test]> SET enable_profile = true; Query OK, 0 rows affected (0.00 sec)
-
使用浏览器打开
http://<fe_host>:<http_http_port>/query
页面,查看 SELECT * FROM mytable where k=1 语句的 Profile,如下所示:OLAP_SCAN (plan_node_id=0): CommonMetrics: - CloseTime: 1.255ms - OperatorTotalTime: 1.404ms - PeakMemoryUsage: 0.00 - PullChunkNum: 8 - PullRowNum: 2 - __MAX_OF_PullRowNum: 2 - __MIN_OF_PullRowNum: 0 - PullTotalTime: 148.60us - PushChunkNum: 0 - PushRowNum: 0 - PushTotalTime: 0ns - SetFinishedTime: 136ns - SetFinishingTime: 129ns UniqueMetrics: - Predicates: 1: k = 1 - Rollup: mytable - Table: mytable - BytesRead: 88.00 B - __MAX_OF_BytesRead: 88.00 B - __MIN_OF_BytesRead: 0.00 - CachedPagesNum: 0 - CompressedBytesRead: 844.00 B - __MAX_OF_CompressedBytesRead: 844.00 B - __MIN_OF_CompressedBytesRead: 0.00 - CreateSegmentIter: 18.582us - IOTime: 4.425us - LateMaterialize: 17.385us - PushdownPredicates: 3 - RawRowsRead: 2 - __MAX_OF_RawRowsRead: 2 - __MIN_OF_RawRowsRead: 0 - ReadPagesNum: 12 - __MAX_OF_ReadPagesNum: 12 - __MIN_OF_ReadPagesNum: 0 - RowsRead: 2 - __MAX_OF_RowsRead: 2 - __MIN_OF_RowsRead: 0 - ScanTime: 154.367us - SegmentInit: 95.903us - BitmapIndexFilter: 0ns - BitmapIndexFilterRows: 0 - BloomFilterFilterRows: 0 - ShortKeyFilterRows: 3 - __MAX_OF_ShortKeyFilterRows: 3 - __MIN_OF_ShortKeyFilterRows: 0 - ZoneMapIndexFilterRows: 0 - SegmentRead: 2.559us - BlockFetch: 2.187us - BlockFetchCount: 2 - __MAX_OF_BlockFetchCount: 2 - __MIN_OF_BlockFetchCount: 0 - BlockSeek: 7.789us - BlockSeekCount: 2 - __MAX_OF_BlockSeekCount: 2 - __MIN_OF_BlockSeekCount: 0 - ChunkCopy: 25ns - DecompressT: 0ns - DelVecFilterRows: 0 - IndexLoad: 0ns - PredFilter: 353ns - PredFilterRows: 0 - RowsetsReadCount: 7 - SegmentsReadCount: 3 - __MAX_OF_SegmentsReadCount: 2 - __MIN_OF_SegmentsReadCount: 0 - TotalColumnsDataPageCount: 8 - __MAX_OF_TotalColumnsDataPageCount: 8 - __MIN_OF_TotalColumnsDataPageCount: 0 - UncompressedBytesRead: 508.00 B - __MAX_OF_UncompressedBytesRead: 508.00 B - __MIN_OF_UncompressedBytesRead: 0.00
k = 1
能够命中前缀索引,因此,读取数据的过程中过滤掉 3 行 (ShortKeyFilterRows: 3
)。