diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 5fee6e956ff..0abde46e40a 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -1546,8 +1546,8 @@ FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query) continue; } - /* NOT MATCHED can have either INSERT or DO NOTHING */ - if (action->commandType == CMD_NOTHING) + /* NOT MATCHED can have either INSERT, DO NOTHING or UPDATE(PG17) */ + if (action->commandType == CMD_NOTHING || action->commandType == CMD_UPDATE) { return NULL; } diff --git a/src/test/regress/expected/pg17.out b/src/test/regress/expected/pg17.out index dfd88e30ec6..83507bb150d 100644 --- a/src/test/regress/expected/pg17.out +++ b/src/test/regress/expected/pg17.out @@ -2195,6 +2195,501 @@ CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXEC RESET citus.log_remote_commands; -- End of EXPLAIN MEMORY SERIALIZE tests +-- Add support for MERGE ... WHEN NOT MATCHED BY SOURCE. +-- Relevant PG commit: +-- https://github.com/postgres/postgres/commit/0294df2f1 +SET citus.next_shard_id TO 1072025; +-- Regular Postgres tables +CREATE TABLE postgres_target_1 (tid integer, balance float, val text); +CREATE TABLE postgres_target_2 (tid integer, balance float, val text); +CREATE TABLE postgres_source (sid integer, delta float); +INSERT INTO postgres_target_1 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO postgres_target_2 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO postgres_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +-- Citus local tables +CREATE TABLE citus_local_target (tid integer, balance float, val text); +CREATE TABLE citus_local_source (sid integer, delta float); +SELECT citus_add_local_table_to_metadata('citus_local_target'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('citus_local_source'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO citus_local_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO citus_local_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +-- Citus distributed tables +CREATE TABLE citus_distributed_target (tid integer, balance float, val text); +CREATE TABLE citus_distributed_source (sid integer, delta float); +SELECT create_distributed_table('citus_distributed_target', 'tid'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('citus_distributed_source', 'sid'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO citus_distributed_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO citus_distributed_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +-- Citus reference tables +CREATE TABLE citus_reference_target (tid integer, balance float, val text); +CREATE TABLE citus_reference_source (sid integer, delta float); +SELECT create_reference_table('citus_reference_target'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('citus_reference_source'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO citus_reference_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO citus_reference_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +-- Try all combinations of tables with two queries: +-- 1: Simple Merge +-- 2: Merge with a constant qual +-- Run the merge queries with the postgres tables +-- to save the expected output +-- try simple MERGE +MERGE INTO postgres_target_1 t + USING postgres_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT * FROM postgres_target_1 ORDER BY tid, val; + tid | balance | val +--------------------------------------------------------------------- + 1 | 110 | initial updated by merge + 2 | 20 | inserted by merge + 3 | 330 | initial updated by merge + 4 | 40 | inserted by merge + 5 | 550 | initial updated by merge + 6 | 60 | inserted by merge + 7 | 770 | initial updated by merge + 8 | 80 | inserted by merge + 9 | 990 | initial updated by merge + 10 | 100 | inserted by merge + 11 | 1210 | initial updated by merge + 12 | 120 | inserted by merge + 13 | 1430 | initial updated by merge + 14 | 140 | inserted by merge + 15 | 1500 | initial not matched by source +(15 rows) + +-- same with a constant qual +MERGE INTO postgres_target_2 t + USING postgres_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT * FROM postgres_target_2 ORDER BY tid, val; + tid | balance | val +--------------------------------------------------------------------- + 1 | 110 | initial updated by merge + 2 | 20 | inserted by merge + 3 | 300 | initial not matched by source + 3 | 30 | inserted by merge + 4 | 40 | inserted by merge + 5 | 500 | initial not matched by source + 5 | 50 | inserted by merge + 6 | 60 | inserted by merge + 7 | 700 | initial not matched by source + 7 | 70 | inserted by merge + 8 | 80 | inserted by merge + 9 | 900 | initial not matched by source + 9 | 90 | inserted by merge + 10 | 100 | inserted by merge + 11 | 1100 | initial not matched by source + 11 | 110 | inserted by merge + 12 | 120 | inserted by merge + 13 | 1300 | initial not matched by source + 13 | 130 | inserted by merge + 14 | 140 | inserted by merge + 15 | 1500 | initial not matched by source +(21 rows) + +-- function to compare the output from Citus tables +-- with the expected output from Postgres tables +CREATE OR REPLACE FUNCTION compare_tables(table1 TEXT, table2 TEXT) RETURNS BOOLEAN AS $$ +DECLARE ret BOOL; +BEGIN +EXECUTE 'select count(*) = 0 from (( + SELECT * FROM ' || table1 || + ' EXCEPT + SELECT * FROM ' || table2 || ' ) + UNION ALL ( + SELECT * FROM ' || table2 || + ' EXCEPT + SELECT * FROM ' || table1 || ' ))' INTO ret; +RETURN ret; +END +$$ LANGUAGE PLPGSQL; +-- Local-Local +-- Let's also print the command here +-- try simple MERGE +BEGIN; +SET citus.log_local_commands TO on; +MERGE INTO citus_local_target t + USING citus_local_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_local_target', 'postgres_target_1'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO citus_local_target t + USING citus_local_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED BY TARGET THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_local_target', 'postgres_target_2'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Local-Reference +-- try simple MERGE +BEGIN; +MERGE INTO citus_local_target t + USING citus_reference_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED BY TARGET THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_local_target', 'postgres_target_1'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO citus_local_target t + USING citus_reference_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_local_target', 'postgres_target_2'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Local-Distributed - Merge currently not supported, Feature in development. +-- try simple MERGE +MERGE INTO citus_local_target t + USING citus_distributed_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +ERROR: MERGE involving repartition of rows is supported only if the target is distributed +-- Distributed-Local +-- try simple MERGE +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_local_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_1'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_local_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED BY TARGET THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_2'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Distributed-Distributed +-- try simple MERGE +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_distributed_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_1'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_distributed_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_2'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Distributed-Reference +-- try simple MERGE +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_reference_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_1'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_reference_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_2'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Reference-N/A - Reference table as target is not allowed in Merge +-- try simple MERGE +MERGE INTO citus_reference_target t + USING citus_distributed_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +ERROR: Reference table as target is not allowed in MERGE command +-- Complex repartition query example with a mix of tables +-- Example from blog post +-- https://www.citusdata.com/blog/2023/07/27/how-citus-12-supports-postgres-merge +-- Contains information about the machines in the manufacturing facility +CREATE TABLE machines ( + machine_id NUMERIC PRIMARY KEY, + machine_name VARCHAR(100), + location VARCHAR(50), + status VARCHAR(20) +); +SELECT create_reference_table('machines'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- Holds data on the various sensors installed on each machine +CREATE TABLE sensors ( + sensor_id NUMERIC PRIMARY KEY, + sensor_name VARCHAR(100), + machine_id NUMERIC, + sensor_type VARCHAR(50) +); +SELECT create_distributed_table('sensors', 'sensor_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Stores real-time readings from the sensors +CREATE TABLE sensor_readings ( + reading_id NUMERIC , + sensor_id NUMERIC, + reading_value NUMERIC, + reading_timestamp TIMESTAMP +); +SELECT create_distributed_table('sensor_readings', 'sensor_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Holds real-time sensor readings for machines on 'Production Floor 1' +CREATE TABLE real_sensor_readings ( + real_reading_id NUMERIC , + sensor_id NUMERIC, + reading_value NUMERIC, + reading_timestamp TIMESTAMP +); +SELECT create_distributed_table('real_sensor_readings', 'sensor_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Insert data into the machines table +INSERT INTO machines (machine_id, machine_name, location, status) +VALUES + (1, 'Machine A', 'Production Floor 1', 'Active'), + (2, 'Machine B', 'Production Floor 2', 'Active'), + (3, 'Machine C', 'Production Floor 1', 'Inactive'); +-- Insert data into the sensors table +INSERT INTO sensors (sensor_id, sensor_name, machine_id, sensor_type) +VALUES + (1, 'Temperature Sensor 1', 1, 'Temperature'), + (2, 'Pressure Sensor 1', 1, 'Pressure'), + (3, 'Temperature Sensor 2', 2, 'Temperature'), + (4, 'Vibration Sensor 1', 3, 'Vibration'); +-- Insert data into the real_sensor_readings table +INSERT INTO real_sensor_readings (real_reading_id, sensor_id, reading_value, reading_timestamp) +VALUES + (1, 1, 35.6, TIMESTAMP '2023-07-20 10:15:00'), + (2, 1, 36.8, TIMESTAMP '2023-07-20 10:30:00'), + (3, 2, 100.5, TIMESTAMP '2023-07-20 10:15:00'), + (4, 2, 101.2, TIMESTAMP '2023-07-20 10:30:00'), + (5, 3, 36.2, TIMESTAMP '2023-07-20 10:15:00'), + (6, 3, 36.5, TIMESTAMP '2023-07-20 10:30:00'), + (7, 4, 0.02, TIMESTAMP '2023-07-20 10:15:00'), + (8, 4, 0.03, TIMESTAMP '2023-07-20 10:30:00'); +-- Insert DUMMY data to use for WHEN NOT MATCHED BY SOURCE +INSERT INTO sensor_readings VALUES (0, 0, 0, TIMESTAMP '2023-07-20 10:15:00'); +SET client_min_messages TO DEBUG1; +-- Complex merge query which needs repartitioning +MERGE INTO sensor_readings SR +USING (SELECT +rsr.sensor_id, +AVG(rsr.reading_value) AS average_reading, +MAX(rsr.reading_timestamp) AS last_reading_timestamp, +MAX(rsr.real_reading_id) AS rid +FROM sensors s +INNER JOIN machines m ON s.machine_id = m.machine_id +INNER JOIN real_sensor_readings rsr ON s.sensor_id = rsr.sensor_id +WHERE m.location = 'Production Floor 1' +GROUP BY rsr.sensor_id +) NEW_READINGS +ON (SR.sensor_id = NEW_READINGS.sensor_id) +-- Existing reading, update it +WHEN MATCHED THEN +UPDATE SET reading_value = NEW_READINGS.average_reading, reading_timestamp = NEW_READINGS.last_reading_timestamp +-- New reading, record it +WHEN NOT MATCHED BY TARGET THEN +INSERT (reading_id, sensor_id, reading_value, reading_timestamp) +VALUES (NEW_READINGS.rid, NEW_READINGS.sensor_id, +NEW_READINGS.average_reading, NEW_READINGS.last_reading_timestamp) +-- Target has dummy entry not matched by source +-- dummy move change reading_value to 100 to notice the change +WHEN NOT MATCHED BY SOURCE THEN +UPDATE SET reading_value = 100; +DEBUG: A mix of distributed and reference table, try repartitioning +DEBUG: A mix of distributed and reference table, routable query is not possible +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:0 from the source list to redistribute +DEBUG: Executing subplans of the source query and storing the results at the respective node(s) +DEBUG: Redistributing source result rows across nodes +DEBUG: Executing final MERGE on workers using intermediate results +DEBUG: +DEBUG: +RESET client_min_messages; +-- Expected output is: +-- reading_id | sensor_id | reading_value | reading_timestamp +-- ------------+-----------+------------------------+--------------------- +-- 0 | 0 | 100 | 2023-07-20 10:15:00 +-- 2 | 1 | 36.2000000000000000 | 2023-07-20 10:30:00 +-- 4 | 2 | 100.8500000000000000 | 2023-07-20 10:30:00 +-- 8 | 4 | 0.02500000000000000000 | 2023-07-20 10:30:00 +SELECT * FROM sensor_readings ORDER BY 1; + reading_id | sensor_id | reading_value | reading_timestamp +--------------------------------------------------------------------- + 0 | 0 | 100 | Thu Jul 20 10:15:00 2023 + 2 | 1 | 36.2000000000000000 | Thu Jul 20 10:30:00 2023 + 4 | 2 | 100.8500000000000000 | Thu Jul 20 10:30:00 2023 + 8 | 4 | 0.02500000000000000000 | Thu Jul 20 10:30:00 2023 +(4 rows) + +-- End of MERGE ... WHEN NOT MATCHED BY SOURCE tests \set VERBOSITY terse SET client_min_messages TO WARNING; DROP SCHEMA pg17 CASCADE; diff --git a/src/test/regress/sql/pg17.sql b/src/test/regress/sql/pg17.sql index 70d5f68a84a..e4843db4490 100644 --- a/src/test/regress/sql/pg17.sql +++ b/src/test/regress/sql/pg17.sql @@ -1080,6 +1080,376 @@ select public.explain_filter('explain (analyze,serialize) create temp table expl RESET citus.log_remote_commands; -- End of EXPLAIN MEMORY SERIALIZE tests +-- Add support for MERGE ... WHEN NOT MATCHED BY SOURCE. +-- Relevant PG commit: +-- https://github.com/postgres/postgres/commit/0294df2f1 + +SET citus.next_shard_id TO 1072025; + +-- Regular Postgres tables +CREATE TABLE postgres_target_1 (tid integer, balance float, val text); +CREATE TABLE postgres_target_2 (tid integer, balance float, val text); +CREATE TABLE postgres_source (sid integer, delta float); +INSERT INTO postgres_target_1 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO postgres_target_2 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO postgres_source SELECT id, id * 10 FROM generate_series(1,14) AS id; + +-- Citus local tables +CREATE TABLE citus_local_target (tid integer, balance float, val text); +CREATE TABLE citus_local_source (sid integer, delta float); +SELECT citus_add_local_table_to_metadata('citus_local_target'); +SELECT citus_add_local_table_to_metadata('citus_local_source'); +INSERT INTO citus_local_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO citus_local_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +-- Citus distributed tables +CREATE TABLE citus_distributed_target (tid integer, balance float, val text); +CREATE TABLE citus_distributed_source (sid integer, delta float); +SELECT create_distributed_table('citus_distributed_target', 'tid'); +SELECT create_distributed_table('citus_distributed_source', 'sid'); +INSERT INTO citus_distributed_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO citus_distributed_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +-- Citus reference tables +CREATE TABLE citus_reference_target (tid integer, balance float, val text); +CREATE TABLE citus_reference_source (sid integer, delta float); +SELECT create_reference_table('citus_reference_target'); +SELECT create_reference_table('citus_reference_source'); +INSERT INTO citus_reference_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO citus_reference_source SELECT id, id * 10 FROM generate_series(1,14) AS id; + +-- Try all combinations of tables with two queries: +-- 1: Simple Merge +-- 2: Merge with a constant qual + +-- Run the merge queries with the postgres tables +-- to save the expected output + +-- try simple MERGE +MERGE INTO postgres_target_1 t + USING postgres_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT * FROM postgres_target_1 ORDER BY tid, val; + +-- same with a constant qual +MERGE INTO postgres_target_2 t + USING postgres_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT * FROM postgres_target_2 ORDER BY tid, val; + +-- function to compare the output from Citus tables +-- with the expected output from Postgres tables + +CREATE OR REPLACE FUNCTION compare_tables(table1 TEXT, table2 TEXT) RETURNS BOOLEAN AS $$ +DECLARE ret BOOL; +BEGIN +EXECUTE 'select count(*) = 0 from (( + SELECT * FROM ' || table1 || + ' EXCEPT + SELECT * FROM ' || table2 || ' ) + UNION ALL ( + SELECT * FROM ' || table2 || + ' EXCEPT + SELECT * FROM ' || table1 || ' ))' INTO ret; +RETURN ret; +END +$$ LANGUAGE PLPGSQL; + +-- Local-Local +-- Let's also print the command here +-- try simple MERGE +BEGIN; +SET citus.log_local_commands TO on; +MERGE INTO citus_local_target t + USING citus_local_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_local_target', 'postgres_target_1'); +ROLLBACK; + +-- same with a constant qual +BEGIN; +MERGE INTO citus_local_target t + USING citus_local_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED BY TARGET THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_local_target', 'postgres_target_2'); +ROLLBACK; + +-- Local-Reference +-- try simple MERGE +BEGIN; +MERGE INTO citus_local_target t + USING citus_reference_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED BY TARGET THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_local_target', 'postgres_target_1'); +ROLLBACK; + +-- same with a constant qual +BEGIN; +MERGE INTO citus_local_target t + USING citus_reference_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_local_target', 'postgres_target_2'); +ROLLBACK; + +-- Local-Distributed - Merge currently not supported, Feature in development. +-- try simple MERGE +MERGE INTO citus_local_target t + USING citus_distributed_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; + +-- Distributed-Local +-- try simple MERGE +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_local_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_1'); +ROLLBACK; + +-- same with a constant qual +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_local_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED BY TARGET THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_2'); +ROLLBACK; + +-- Distributed-Distributed +-- try simple MERGE +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_distributed_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_1'); +ROLLBACK; + +-- same with a constant qual +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_distributed_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_2'); +ROLLBACK; + +-- Distributed-Reference +-- try simple MERGE +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_reference_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_1'); +ROLLBACK; + +-- same with a constant qual +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_reference_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_2'); +ROLLBACK; + +-- Reference-N/A - Reference table as target is not allowed in Merge +-- try simple MERGE +MERGE INTO citus_reference_target t + USING citus_distributed_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; + +-- Complex repartition query example with a mix of tables +-- Example from blog post +-- https://www.citusdata.com/blog/2023/07/27/how-citus-12-supports-postgres-merge + +-- Contains information about the machines in the manufacturing facility +CREATE TABLE machines ( + machine_id NUMERIC PRIMARY KEY, + machine_name VARCHAR(100), + location VARCHAR(50), + status VARCHAR(20) +); +SELECT create_reference_table('machines'); + +-- Holds data on the various sensors installed on each machine +CREATE TABLE sensors ( + sensor_id NUMERIC PRIMARY KEY, + sensor_name VARCHAR(100), + machine_id NUMERIC, + sensor_type VARCHAR(50) +); +SELECT create_distributed_table('sensors', 'sensor_id'); + +-- Stores real-time readings from the sensors +CREATE TABLE sensor_readings ( + reading_id NUMERIC , + sensor_id NUMERIC, + reading_value NUMERIC, + reading_timestamp TIMESTAMP +); +SELECT create_distributed_table('sensor_readings', 'sensor_id'); + +-- Holds real-time sensor readings for machines on 'Production Floor 1' +CREATE TABLE real_sensor_readings ( + real_reading_id NUMERIC , + sensor_id NUMERIC, + reading_value NUMERIC, + reading_timestamp TIMESTAMP +); +SELECT create_distributed_table('real_sensor_readings', 'sensor_id'); + +-- Insert data into the machines table +INSERT INTO machines (machine_id, machine_name, location, status) +VALUES + (1, 'Machine A', 'Production Floor 1', 'Active'), + (2, 'Machine B', 'Production Floor 2', 'Active'), + (3, 'Machine C', 'Production Floor 1', 'Inactive'); + +-- Insert data into the sensors table +INSERT INTO sensors (sensor_id, sensor_name, machine_id, sensor_type) +VALUES + (1, 'Temperature Sensor 1', 1, 'Temperature'), + (2, 'Pressure Sensor 1', 1, 'Pressure'), + (3, 'Temperature Sensor 2', 2, 'Temperature'), + (4, 'Vibration Sensor 1', 3, 'Vibration'); + +-- Insert data into the real_sensor_readings table +INSERT INTO real_sensor_readings (real_reading_id, sensor_id, reading_value, reading_timestamp) +VALUES + (1, 1, 35.6, TIMESTAMP '2023-07-20 10:15:00'), + (2, 1, 36.8, TIMESTAMP '2023-07-20 10:30:00'), + (3, 2, 100.5, TIMESTAMP '2023-07-20 10:15:00'), + (4, 2, 101.2, TIMESTAMP '2023-07-20 10:30:00'), + (5, 3, 36.2, TIMESTAMP '2023-07-20 10:15:00'), + (6, 3, 36.5, TIMESTAMP '2023-07-20 10:30:00'), + (7, 4, 0.02, TIMESTAMP '2023-07-20 10:15:00'), + (8, 4, 0.03, TIMESTAMP '2023-07-20 10:30:00'); + +-- Insert DUMMY data to use for WHEN NOT MATCHED BY SOURCE +INSERT INTO sensor_readings VALUES (0, 0, 0, TIMESTAMP '2023-07-20 10:15:00'); + +SET client_min_messages TO DEBUG1; +-- Complex merge query which needs repartitioning +MERGE INTO sensor_readings SR +USING (SELECT +rsr.sensor_id, +AVG(rsr.reading_value) AS average_reading, +MAX(rsr.reading_timestamp) AS last_reading_timestamp, +MAX(rsr.real_reading_id) AS rid +FROM sensors s +INNER JOIN machines m ON s.machine_id = m.machine_id +INNER JOIN real_sensor_readings rsr ON s.sensor_id = rsr.sensor_id +WHERE m.location = 'Production Floor 1' +GROUP BY rsr.sensor_id +) NEW_READINGS + +ON (SR.sensor_id = NEW_READINGS.sensor_id) + +-- Existing reading, update it +WHEN MATCHED THEN +UPDATE SET reading_value = NEW_READINGS.average_reading, reading_timestamp = NEW_READINGS.last_reading_timestamp + +-- New reading, record it +WHEN NOT MATCHED BY TARGET THEN +INSERT (reading_id, sensor_id, reading_value, reading_timestamp) +VALUES (NEW_READINGS.rid, NEW_READINGS.sensor_id, +NEW_READINGS.average_reading, NEW_READINGS.last_reading_timestamp) + +-- Target has dummy entry not matched by source +-- dummy move change reading_value to 100 to notice the change +WHEN NOT MATCHED BY SOURCE THEN +UPDATE SET reading_value = 100; + +RESET client_min_messages; + +-- Expected output is: +-- reading_id | sensor_id | reading_value | reading_timestamp +-- ------------+-----------+------------------------+--------------------- +-- 0 | 0 | 100 | 2023-07-20 10:15:00 +-- 2 | 1 | 36.2000000000000000 | 2023-07-20 10:30:00 +-- 4 | 2 | 100.8500000000000000 | 2023-07-20 10:30:00 +-- 8 | 4 | 0.02500000000000000000 | 2023-07-20 10:30:00 +SELECT * FROM sensor_readings ORDER BY 1; + +-- End of MERGE ... WHEN NOT MATCHED BY SOURCE tests \set VERBOSITY terse SET client_min_messages TO WARNING;