-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Postgres backend support #33
base: main
Are you sure you want to change the base?
Conversation
discovered after testing w/dapr
|
||
"google.golang.org/protobuf/encoding/protojson" | ||
|
||
_ "github.com/jackc/pgx/v5/stdlib" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although it's a bit more of a change, I recommend using pgx's native APIs and not the stdlib wrapper.
- The authors of pgx recommend using the stdlib wrapper only when you're trying to use the same code for multiple databases.
- The native driver has much better performance.
- The native driver offers some additional things (see my other comments below)
CREATE SCHEMA IF NOT EXISTS durabletask; | ||
|
||
CREATE TABLE IF NOT EXISTS durabletask.Instances ( | ||
InstanceID VARCHAR PRIMARY KEY NOT NULL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these are UUIDs, consider using the native UUID type for Postgres. It's more efficient as it stores the UUID as a binary sequence (internally). This is especially helpful since the UUID is used as primary key, so it's in many indexes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't restrict instance IDs to be UUIDs. This is just our convention for defaults. Many users, however, will desire to provide their own instance IDs to match some business entity or to implement the singleton pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense
Name VARCHAR NOT NULL, -- the type name of the orchestration or entity | ||
Version VARCHAR NULL, -- the version of the orchestration (optional) | ||
RuntimeStatus VARCHAR NOT NULL, | ||
CreatedTime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For all timestaps, I recommend using "TIMESTAMP WITH TIME ZONE"
Input VARCHAR NULL, | ||
Output VARCHAR NULL, | ||
CustomStatus VARCHAR NULL, | ||
FailureDetails JSON NULL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you planning on querying the fields inside the FailureDetails section? If not, best to store JSON in a VARCHAR (i.e. TEXT) column. The JSON column (and in theory, JSONB should be used instead) is useful to be able to query inside a JSON document, but if it's just for storing data, using TEXT/VARCHAR is more efficient.
CREATE INDEX IF NOT EXISTS IX_Instances_CreatedTime ON durabletask.Instances(CreatedTime); | ||
|
||
CREATE TABLE IF NOT EXISTS durabletask.History ( | ||
InstanceID VARCHAR NOT NULL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to make all these InstanceID's foreign key references? This way, if the record is deleted from the Instances table, the deletion is cascaded
be.workerName, // LockedBy for Instances table | ||
newLockExpiration, // Updated LockExpiration for Instances table | ||
now, // LockExpiration for Instances table | ||
now, // VisibleTime for NewEvents table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate. Can use $3
twice and pass one parameter only
UPDATE durabletask.NewEvents SET DequeueCount = DequeueCount + 1, LockedBy = $1 WHERE ctid IN ( | ||
SELECT ctid FROM durabletask.NewEvents | ||
WHERE InstanceID = $2 AND (VisibleTime IS NULL OR VisibleTime <= $3) | ||
LIMIT 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Magic number?
} | ||
|
||
e := new(protos.HistoryEvent) | ||
err := protojson.Unmarshal(eventPayload, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not storing protobuf inside the DB instead? It's more efficient than JSON anyways, as long as we don't care that users may want to modify the data themselves by hand?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debuggability. It's incredibly painful to try and inspect the database when things go wrong if we store everything as protobuf (which is what we do currently in Dapr and in the SQLite database). I thought the performance vs. debuggability tradeoff would be worthwhile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That can make sense. I would still recommend changing the column types to TEXT however, unless you need to perform queries that look into the JSON data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if ctx.Err() != nil { | ||
return nil, ctx.Err() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if ctx.Err() != nil { | |
return nil, ctx.Err() | |
} | |
if errors.Is(err, ctx.Err()) { | |
return nil, ctx.Err() | |
} |
_, err = tx.ExecContext(ctx, "INSERT INTO durabletask.NewEvents (InstanceID, EventPayload) VALUES ($1, $2)", string(wi.InstanceID), bytes) | ||
if err != nil { | ||
if ctx.Err() != nil { | ||
return ctx.Err() | ||
} | ||
return fmt.Errorf("failed to insert into NewEvents table: %w", err) | ||
} | ||
|
||
dbResult, err := tx.ExecContext(ctx, "DELETE FROM durabletask.NewTasks WHERE SequenceNumber = $1 AND LockedBy = $2", wi.SequenceNumber, wi.LockedBy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For funsies, you could perform both these queries in a single statement to avoid the use of a transaction, so you'd reduce the number of round-trips from 4 to 1 and avoid locking the tables...
WITH del AS (
DELETE FROM durabletask.NewTasks
WHERE SequenceNumber = $1
AND LockedBy = $2
)
INSERT INTO durabletask.NewEvents (InstanceID, EventPayload) VALUES ($3, $4)
See for example: https://dba.stackexchange.com/questions/267243/want-to-delete-then-insert-with-single-statement-using-a-cte-in-postgres
ExecutionID VARCHAR NOT NULL, | ||
Name VARCHAR NOT NULL, -- the type name of the orchestration or entity | ||
Version VARCHAR NULL, -- the version of the orchestration (optional) | ||
RuntimeStatus VARCHAR NOT NULL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using an ENUM type here: https://www.postgresql.org/docs/current/datatype-enum.html
} | ||
defer tx.Rollback() | ||
|
||
row := tx.QueryRowContext(ctx, "SELECT 1 FROM durabletask.Instances WHERE InstanceID = $1", string(id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's another option to write this method without using transactions and a single atomic query:
- For the History table, make InstanceID a foreign key reference to Instances.InstanceID, with an "ON DELETE CASCADE". This means that if the row is deleted from Instances, it's automatically (and atomically) deleted from History too.
- Next, use this query that performs both a lookup and a deletion atomically, without the need for a transaction:
WITH del AS (
DELETE
FROM durabletask.Instances
WHERE InstanceID = $1
AND RuntimeStatus IN ('COMPLETED', 'FAILED', 'TERMINATED')
RETURNING InstanceID
), delcount AS (
SELECT count(*) AS count FROM del
)
SELECT delcount.count
FROM durabletask.Instances, delcount
WHERE durabletask.Instances.InstanceID = $1
This query has 3 possible states:
- No rows returned: this means the instance doesn't exist (you can return
api.ErrInstanceNotFound
) - Returned value is "0" -> The instance exists but it's running (you can return
api.ErrNotCompleted
) - Returned value is "1" -> The instance exists and it's been deleted successfully (it was in completed/failed/terminated state)
Hi i am also interested in having postgres support, do you have the timeline on when this will land? |
Any updates, if the team would be working on this :)? |
This PR adds support for postgres as as a built-in alternative to sqlite for storing orchestration state. The idea is to support an open-source database that can be used more easily in scaled-out deployments.
I've also updated the project to target go 1.20 by default since 1.18 is no longer supported.