- Paimon支持Spark 3.5, 3.4, 3.3, 3.2 和 3.1版本
Version | Jar |
---|---|
Spark 3.5 | paimon-spark-3.5-0.9-SNAPSHOT.jar |
Spark 3.4 | paimon-spark-3.4-0.9-SNAPSHOT.jar |
Spark 3.3 | paimon-spark-3.3-0.9-SNAPSHOT.jar |
Spark 3.2 | paimon-spark-3.2-0.9-SNAPSHOT.jar |
Spark 3.1 | paimon-spark-3.1-0.9-SNAPSHOT.jar |
- 也可以从源码手动构建paimon spark jar,从[clone the git repository
mvn clean install -DskipTests
- 步骤1:指定paimon jar
# 通过jars参数或者packages参数指定paimon jar,使用packages需要将jar包防止spark/jars下
spark-sql ... --jars /path/to/paimon-spark-3.3-0.9-SNAPSHOT.jar
spark-sql ... --packages org.apache.paimon:paimon-spark-3.3:0.9-SNAPSHOT
- 步骤2:指定paimon catalog
# 启动Spark-SQL时,使用以下命令将Paimon的Spark Catalog注册为Paimon。仓库的表文件存储在/tmp/paimon下。
spark-sql ... \
--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.warehouse=file:/tmp/paimon \
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
# spark.sql.catalog.(catalog_name),如上配置catalog名称在catalog.之后
- 步骤3:完成的启动命令
spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.warehouse=file:/Users/huangshimin/Documents/study/flink/paimonData \
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions \
--jars /Users/huangshimin/Documents/study/spark/paimon-spark-3.4-0.9-20240801.002500-46.jar
- 步骤4:切换catalog、database,执行创建、插入、查询操作
use paimon;
use default;
create table my_table (
k int,
v string
) tblproperties (
'primary-key' = 'k'
);
INSERT INTO my_table VALUES (1, 'Hi'), (2, 'Hello');
select * from my_table;
- paimon catalog当前支持三种类型的metastore
filesystem metastore
:默认metastore,它在文件系统中存储元数据和表文件。hive metastore
:它还将元数据存储在Hive metastore中。用户可以直接从Hive访问这些表。jdbc metastore
:它额外地将元数据存储在关系数据库中,如MySQL, Postgres等。
- 创建一个catalog名为
paimon
,数据存储在hdfs:///path/to/warehouse
的catalog
spark-sql ... \
--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse
- 切换catalog
USE paimon.default;
- 通过使用Paimon Hive catalog,对catalog的更改将直接影响到相应的Hive metastore。在这样的catalog中创建的表也可以直接从Hive访问。要使用Hive catalog, Database name, Table name和Field name应该是小写的。
spark-sql ... \
--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse \
--conf spark.sql.catalog.paimon.metastore=hive \
--conf spark.sql.catalog.paimon.uri=thrift://<hive-metastore-host-name>:<port>
- 通过使用Paimon JDBC Catalog,对Catalog的更改将直接存储在SQLITE、MySQL、Postgres等关系数据库中。
- 目前,Lock配置只支持MySQL和SQLite。如果您使用不同类型的数据库进行Catalog存储,请不要配置
lock.enabled
。 - Spark中的Paimon JDBC Catalog需要正确添加相应的连接数据库的jar包。应该首先下载JDBC连接器的jar并将其添加到classpath中。例如MySQL, postgres
database type | Bundle Name | SQL Client JAR |
---|---|---|
mysql | mysql-connector-java | Download |
postgres | postgresql | Download |
spark-sql ... \
--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse \
--conf spark.sql.catalog.paimon.metastore=jdbc \
--conf spark.sql.catalog.paimon.uri=jdbc:mysql://<host>:<port>/<databaseName> \
--conf spark.sql.catalog.paimon.jdbc.user=... \
--conf spark.sql.catalog.paimon.jdbc.password=...
- 在使用Paimon catalog之后,可以创建和删除表。在Paimon Catalogs中创建的表由Catalog管理。当表从Catalog中删除时,它的表文件也将被删除。
-- 在paimon catalog下创建一个表my_table,其主键为dt、hh、user_id
CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id'
);
-- 创建一个分区表
CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT
);
-- 使用my_table的查询结果来创建一个my_table_as
CREATE TABLE my_table_as AS SELECT * FROM my_table;
-- 分区表
CREATE TABLE my_table_partition (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) PARTITIONED BY (dt, hh);
-- 使用my_table_partition的结果来创建一个以dt分区的分区表my_table_partition_as
CREATE TABLE my_table_partition_as PARTITIONED BY (dt) AS SELECT * FROM my_table_partition;
-- 修改表属性
CREATE TABLE my_table_options (
user_id BIGINT,
item_id BIGINT
) TBLPROPERTIES ('file.format' = 'orc');
-- 修改表文件格式orc为parquet
CREATE TABLE my_table_options_as TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM my_table_options;
-- 主键表
CREATE TABLE my_table_pk (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id'
);
-- 基于老表创建一个新的主键表
CREATE TABLE my_table_pk_as TBLPROPERTIES ('primary-key' = 'dt') AS SELECT * FROM my_table_pk;
-- 主键+分区
CREATE TABLE my_table_all (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM my_table_all;
INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query };
- 使用INSERT INTO将记录和更改写入到表中。
INSERT INTO my_table SELECT ...
- 使用' INSERT OVERWRITE '来覆盖整个未分区表。
INSERT OVERWRITE my_table SELECT ...
- 使用
INSERT OVERWRITE
覆盖整个分区
INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...
- spark默认overwrite模式为
static
分区覆盖,开启动态分区覆盖需要设置spark.sql.sources.partitionOverwriteMode
为dynamic
CREATE TABLE my_table (id INT, pt STRING) PARTITIONED BY (pt);
INSERT INTO my_table VALUES (1, 'p1'), (2, 'p2');
-- static overwrite,会覆盖全部分区
INSERT OVERWRITE my_table VALUES (3, 'p1');
SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
| 3| p1|
+---+---+
*/
-- Dynamic overwrite (只覆盖 pt='p1')
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE my_table VALUES (3, 'p1');
SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
| 2| p2|
| 3| p1|
+---+---+
*/
TRUNCATE TABLE my_table;
- spark 支持修改基本类型和结构类型
-- Syntax
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;
CREATE TABLE t (
id INT,
s STRUCT<c1: INT, c2: STRING>,
name STRING)
TBLPROPERTIES (
'primary-key' = 'id',
'merge-engine' = 'deduplicate'
);
-- you can use
UPDATE t SET name = 'a_new' WHERE id = 1;
UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1;
DELETE FROM my_table WHERE currency = 'UNKNOWN';
- Paimon当前支持
Merge into
语法在Spark 3+,它允许在一次提交中基于source表进行一系列更新、插入和删除操作
- 只支持主键表
- 不支持更新主键
WHEN NOT MATCHED BY SOURCE
语法不支持
案例1:
-- 如果target和source中a字段相同的行存在,则全部修改,否则插入
MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
案例2:
MERGE INTO target
USING source
ON target.a = source.a
-- 当目标表a为5是,则更新b为source.b + target.b
WHEN MATCHED AND target.a = 5 THEN
UPDATE SET b = source.b + target.b
WHEN MATCHED AND source.c > 'c2' THEN
UPDATE SET *
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED AND c > 'c9' THEN
INSERT (a, b, c) VALUES (a, b * 1.1, c)
WHEN NOT MATCHED THEN
INSERT *
Paimon 当前支持Spark 3+对于流式写入
Paimon Structured Streaming only supports the two
append
andcomplete
modes.
// Create a paimon table if not exists.
spark.sql(s"""
|CREATE TABLE T (k INT, v STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
|""".stripMargin)
// Here we use MemoryStream to fake a streaming source.
val inputData = MemoryStream[(Int, String)]
val df = inputData.toDS().toDF("k", "v")
// Streaming Write to paimon table.
val stream = df
.writeStream
.outputMode("append")
.option("checkpointLocation", "/path/to/checkpoint")
.format("paimon")
.start("/path/to/paimon/sink/table")
- Schema evolution是一种允许用户轻松修改表的当前schema以适应现有数据或随时间变化的新数据的特性,同时保持数据的完整性和一致性。
- Paimon支持在写入数据时自动合并source数据和当前表数据的schema,并将合并后的schema用作表的最新schema,只需要配置
write.merge-schema
为true
即可。
data.write
.format("paimon")
.mode("append")
.option("write.merge-schema", "true")
.save(location)
- 当开启
write.merge-schema
时,默认情况下,Paimon允许用户对表schema执行以下操作: - 添加新列
- 修改列类型(e.g. Int -> Long)
- Paimon还支持特定类型之间的显式类型转换(e.g. String -> Date, Long -> Int),它需要显式配置
write.merge-schema.explicit-cast
为true
- Schema evolution可以同时用于流模式。
val inputData = MemoryStream[(Int, String)]
inputData
.toDS()
.toDF("col1", "col2")
.writeStream
.format("paimon")
.option("checkpointLocation", "/path/to/checkpoint")
.option("write.merge-schema", "true")
.option("write.merge-schema.explicit-cast", "true")
.start(location)
Scan Mode | Description |
---|---|
write.merge-schema | If true, merge the data schema and the table schema automatically before write data. |
write.merge-schema.explicit-cast | If true, allow to merge data types if the two types meet the rules for explicit casting. |
- Paimon的批量读取返回表快照中的所有数据。默认情况下,批量读取返回最新的快照。
- time travel的Paimon批量读可以指定一个snapshot-id或一个tag,并读取相应的数据,需要Spark3.3+版本
-- 根据快照id读取数据
SELECT * FROM t VERSION AS OF 1;
-- 根据写入时间读取最近快照
SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';
-- 根据时间戳读取最近快照
SELECT * FROM t TIMESTAMP AS OF 1678883047;
-- read tag 'my-tag'
SELECT * FROM t VERSION AS OF 'my-tag';
-- read the snapshot from specified watermark. will match the first snapshot after the watermark
SELECT * FROM t VERSION AS OF 'watermark-1678883047356';
- 读取开始快照(不包含)和结束快照之间的增量变化,例如:
- ‘5,10’ 读取快照5和快照10的最新的数据
- ‘TAG1,TAG3’ 读取
TAG1
和TAG3
的最新的数据
- 默认情况下,将扫描changelog文件以查找生成changelog文件的表。否则,扫描新修改的文件。指定
ncremental-between-scan-mode
指定模式。要求Spark3.2+版本; - Paimon支持使用Spark SQL进行增量查询,而增量查询是由Spark表值函数实现的。
SELECT * FROM paimon_incremental_query('test1', 1, 5);
-- 增量查询不支持查询DELETE数据,如果想要查询DELETE数据,可以查询系统表audit_log
SELECT * FROM `test1$audit_log`;
Paimon currently supports Spark 3.3+ for streaming read.
- Paimon支持的流式多去模式
Scan Mode | Description |
---|---|
latest | 对于流式数据源,连续地读取最新的更改,而不是在开始时生成快照。 |
latest-full | 对于流式数据源, 在第一次启动时在表上生成最新的快照,并继续读取最新的更改。 |
from-timestamp | 对于流式数据源,从“scan.timestamp-millis”指定的时间戳开始连续读取更改。不会在开始时生成快照。 |
from-snapshot | 对于流式数据源,从“scan.snapshot-id”指定的快照开始连续读取更改,开始时不生成快照。 |
from-snapshot-full | 对于流式数据源,生成由“scan.snapshot-id”指定的快照。在第一次启动时在表上读取,并持续读取更改。 |
default | It is equivalent to from-snapshot if "scan.snapshot-id" is specified. It is equivalent to from-timestamp if "timestamp-millis" is specified. Or, It is equivalent to latest-full. |
-
建议与查询一起指定分区和主键过滤,这将加快查询的Data Skipping,可以加速数据跳变的过滤函数如下:
-
=
-
<
-
<=
-
>
-
>=
-
IN (...)
-
LIKE 'abc%'
-
IS NULL
-
-
Paimon将按主键对数据进行排序,这加快了点查询和范围查询的速度。当使用复合主键时,查询过滤器最好在主键的最左边形成一个前缀,以获得良好的加速,具体如下:
-- 创建复合主键表
CREATE TABLE orders (
catalog_id BIGINT,
order_id BIGINT,
.....,
) TBLPROPERTIES (
'primary-key' = 'catalog_id,order_id'
);
-- 走复合主键查询优化
SELECT * FROM orders WHERE catalog_id=1025;
SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;
SELECT * FROM orders
WHERE catalog_id=1025
AND order_id>2035 AND order_id<6000;
-- 不触发复合主键查询优化
SELECT * FROM orders WHERE order_id=29495;
SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;
- 修改表
my_table
的write-buffer-size
为256MB
ALTER TABLE my_table SET TBLPROPERTIES (
'write-buffer-size' = '256 MB'
);
ALTER TABLE my_table UNSET TBLPROPERTIES ('write-buffer-size');
ALTER TABLE my_table SET TBLPROPERTIES (
'comment' = 'table comment'
);
ALTER TABLE my_table UNSET TBLPROPERTIES ('comment');
ALTER TABLE my_table RENAME TO my_table_new;
ALTER TABLE [catalog.[database.]]test1 RENAME to [database.]test2;
ALTER TABLE catalog.database.test1 RENAME to catalog.database.test2;
如果使用对象存储,如S3或OSS,请谨慎使用此语法,因为对象存储的重命名不是原子性的,在失败的情况下可能只会移动部分文件。
ALTER TABLE my_table ADD COLUMNS (
c1 INT,
c2 STRING
);
ALTER TABLE my_table RENAME COLUMN c0 TO c1;
ALTER TABLE my_table DROP COLUMNS (c1, c2);
ALTER TABLE my_table DROP PARTITION (`id` = 1, `name` = 'paimon');
ALTER TABLE my_table ALTER COLUMN buy_count COMMENT 'buy count';
ALTER TABLE my_table ADD COLUMN c INT FIRST;
ALTER TABLE my_table ADD COLUMN c INT AFTER b;
ALTER TABLE my_table ALTER COLUMN col_a FIRST;
ALTER TABLE my_table ALTER COLUMN col_a AFTER col_b;
ALTER TABLE my_table ALTER COLUMN col_a TYPE DOUBLE;