Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres backend support #33

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open

Postgres backend support #33

wants to merge 3 commits into from

Conversation

cgillum
Copy link
Member

@cgillum cgillum commented Oct 6, 2023

This PR adds support for postgres as as a built-in alternative to sqlite for storing orchestration state. The idea is to support an open-source database that can be used more easily in scaled-out deployments.

I've also updated the project to target go 1.20 by default since 1.18 is no longer supported.


"google.golang.org/protobuf/encoding/protojson"

_ "github.com/jackc/pgx/v5/stdlib"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although it's a bit more of a change, I recommend using pgx's native APIs and not the stdlib wrapper.

  1. The authors of pgx recommend using the stdlib wrapper only when you're trying to use the same code for multiple databases.
  2. The native driver has much better performance.
  3. The native driver offers some additional things (see my other comments below)

CREATE SCHEMA IF NOT EXISTS durabletask;

CREATE TABLE IF NOT EXISTS durabletask.Instances (
InstanceID VARCHAR PRIMARY KEY NOT NULL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are UUIDs, consider using the native UUID type for Postgres. It's more efficient as it stores the UUID as a binary sequence (internally). This is especially helpful since the UUID is used as primary key, so it's in many indexes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't restrict instance IDs to be UUIDs. This is just our convention for defaults. Many users, however, will desire to provide their own instance IDs to match some business entity or to implement the singleton pattern.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense

Name VARCHAR NOT NULL, -- the type name of the orchestration or entity
Version VARCHAR NULL, -- the version of the orchestration (optional)
RuntimeStatus VARCHAR NOT NULL,
CreatedTime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all timestaps, I recommend using "TIMESTAMP WITH TIME ZONE"

Input VARCHAR NULL,
Output VARCHAR NULL,
CustomStatus VARCHAR NULL,
FailureDetails JSON NULL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you planning on querying the fields inside the FailureDetails section? If not, best to store JSON in a VARCHAR (i.e. TEXT) column. The JSON column (and in theory, JSONB should be used instead) is useful to be able to query inside a JSON document, but if it's just for storing data, using TEXT/VARCHAR is more efficient.

CREATE INDEX IF NOT EXISTS IX_Instances_CreatedTime ON durabletask.Instances(CreatedTime);

CREATE TABLE IF NOT EXISTS durabletask.History (
InstanceID VARCHAR NOT NULL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to make all these InstanceID's foreign key references? This way, if the record is deleted from the Instances table, the deletion is cascaded

be.workerName, // LockedBy for Instances table
newLockExpiration, // Updated LockExpiration for Instances table
now, // LockExpiration for Instances table
now, // VisibleTime for NewEvents table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate. Can use $3 twice and pass one parameter only

UPDATE durabletask.NewEvents SET DequeueCount = DequeueCount + 1, LockedBy = $1 WHERE ctid IN (
SELECT ctid FROM durabletask.NewEvents
WHERE InstanceID = $2 AND (VisibleTime IS NULL OR VisibleTime <= $3)
LIMIT 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic number?

}

e := new(protos.HistoryEvent)
err := protojson.Unmarshal(eventPayload, e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not storing protobuf inside the DB instead? It's more efficient than JSON anyways, as long as we don't care that users may want to modify the data themselves by hand?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debuggability. It's incredibly painful to try and inspect the database when things go wrong if we store everything as protobuf (which is what we do currently in Dapr and in the SQLite database). I thought the performance vs. debuggability tradeoff would be worthwhile.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That can make sense. I would still recommend changing the column types to TEXT however, unless you need to perform queries that look into the JSON data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +718 to +720
if ctx.Err() != nil {
return nil, ctx.Err()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ctx.Err() != nil {
return nil, ctx.Err()
}
if errors.Is(err, ctx.Err()) {
return nil, ctx.Err()
}

Comment on lines +809 to +817
_, err = tx.ExecContext(ctx, "INSERT INTO durabletask.NewEvents (InstanceID, EventPayload) VALUES ($1, $2)", string(wi.InstanceID), bytes)
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
return fmt.Errorf("failed to insert into NewEvents table: %w", err)
}

dbResult, err := tx.ExecContext(ctx, "DELETE FROM durabletask.NewTasks WHERE SequenceNumber = $1 AND LockedBy = $2", wi.SequenceNumber, wi.LockedBy)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For funsies, you could perform both these queries in a single statement to avoid the use of a transaction, so you'd reduce the number of round-trips from 4 to 1 and avoid locking the tables...

WITH del AS (
    DELETE FROM durabletask.NewTasks
        WHERE SequenceNumber = $1
        AND LockedBy = $2
)
INSERT INTO durabletask.NewEvents (InstanceID, EventPayload) VALUES ($3, $4)

See for example: https://dba.stackexchange.com/questions/267243/want-to-delete-then-insert-with-single-statement-using-a-cte-in-postgres

ExecutionID VARCHAR NOT NULL,
Name VARCHAR NOT NULL, -- the type name of the orchestration or entity
Version VARCHAR NULL, -- the version of the orchestration (optional)
RuntimeStatus VARCHAR NOT NULL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
defer tx.Rollback()

row := tx.QueryRowContext(ctx, "SELECT 1 FROM durabletask.Instances WHERE InstanceID = $1", string(id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's another option to write this method without using transactions and a single atomic query:

  1. For the History table, make InstanceID a foreign key reference to Instances.InstanceID, with an "ON DELETE CASCADE". This means that if the row is deleted from Instances, it's automatically (and atomically) deleted from History too.
  2. Next, use this query that performs both a lookup and a deletion atomically, without the need for a transaction:
WITH del AS (
	DELETE
		FROM durabletask.Instances
		WHERE InstanceID = $1
		AND RuntimeStatus IN ('COMPLETED', 'FAILED', 'TERMINATED')
		RETURNING InstanceID
), delcount AS (
	SELECT count(*) AS count FROM del
)
SELECT delcount.count
	FROM durabletask.Instances, delcount
	WHERE durabletask.Instances.InstanceID = $1

This query has 3 possible states:

  1. No rows returned: this means the instance doesn't exist (you can return api.ErrInstanceNotFound)
  2. Returned value is "0" -> The instance exists but it's running (you can return api.ErrNotCompleted)
  3. Returned value is "1" -> The instance exists and it's been deleted successfully (it was in completed/failed/terminated state)

@balchua
Copy link

balchua commented Dec 30, 2023

Hi i am also interested in having postgres support, do you have the timeline on when this will land?
Thanks

@ivan-penchev
Copy link

Any updates, if the team would be working on this :)?

@acx1729 acx1729 mentioned this pull request Nov 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants