Skip to content

Commit

Permalink
feat(pgmq): Partition Class
Browse files Browse the repository at this point in the history
- createPartitioned()
- showPartitions()
- runMaintenance()
  • Loading branch information
waitingsong committed Jan 10, 2025
1 parent 233f2db commit b50dc6d
Show file tree
Hide file tree
Showing 21 changed files with 608 additions and 15 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/nodejs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@ jobs:
run: |
psql -V
dig postgres
echo "\l" | psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d postgres
psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -c "SELECT version();"
psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -c "\d+"
psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -c "SHOW TIMEZONE;"
env:
CI: true
PGMQ_HOST: postgres
Expand Down
4 changes: 3 additions & 1 deletion .scripts/ci/ci-init-db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ dig postgres

PGPASSWORD="$PGMQ_PASSWORD"
psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -c "SHOW TIMEZONE;"
psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -c "SELECT extname, extversion FROM pg_extension;"

echo -e "\n"

Expand All @@ -16,5 +15,8 @@ SQL_DIR="$cwd/packages/pgmq-js/database/"
cd "$SQL_DIR"
. ./init-db.sh

psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -c "SELECT extname, extversion FROM pg_extension;"
echo "\l" | psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d postgres
psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -c "\d+"
cd "$cwd"

8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ psql -h $POSTGRES_HOST -p $POSTGRES_PORT -U$POSTGRES_USER -d $POSTGRES_DB -bq \
- [x] [purge_queue](https://tembo-io.github.io/pgmq/api/sql/functions/#purge_queue)
- [x] [archive (single)](https://tembo-io.github.io/pgmq/api/sql/functions/#archive-single)
- [x] [archive (batch)](https://tembo-io.github.io/pgmq/api/sql/functions/#archive-batch)
- [ ] [Queue Management](https://tembo-io.github.io/pgmq/api/sql/functions/#queue-management)
- [x] [Queue Management](https://tembo-io.github.io/pgmq/api/sql/functions/#queue-management)
- [x] [create](https://tembo-io.github.io/pgmq/api/sql/functions/#create)
- [ ] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned)
- [x] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned) see `Partition`
- [x] [create_unlogged](https://tembo-io.github.io/pgmq/api/sql/functions/#create_unlogged)
- [x] [detach_archive](https://tembo-io.github.io/pgmq/api/sql/functions/#detach_archive)
- [x] [drop_queue](https://tembo-io.github.io/pgmq/api/sql/functions/#drop_queue)
Expand All @@ -83,6 +83,10 @@ psql -h $POSTGRES_HOST -p $POSTGRES_PORT -U$POSTGRES_USER -d $POSTGRES_DB -bq \
- [x] [list_queues](https://tembo-io.github.io/pgmq/api/sql/functions/#list_queues)
- [x] [metrics](https://tembo-io.github.io/pgmq/api/sql/functions/#metrics)
- [x] [metrics_all](https://tembo-io.github.io/pgmq/api/sql/functions/#metrics_all)
- Partition
- [x] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned)
- [x] [show_partitions](https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#show_partitions)
- [x] [run_maintenance](https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#run_maintenance)



Expand Down
8 changes: 6 additions & 2 deletions packages/mwcp-pgmq-js/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ Open url `http://127.0.0.1:7001/swagger-ui/index.html` with browser
- [x] [purge_queue](https://tembo-io.github.io/pgmq/api/sql/functions/#purge_queue)
- [x] [archive (single)](https://tembo-io.github.io/pgmq/api/sql/functions/#archive-single)
- [x] [archive (batch)](https://tembo-io.github.io/pgmq/api/sql/functions/#archive-batch)
- [ ] [Queue Management](https://tembo-io.github.io/pgmq/api/sql/functions/#queue-management)
- [x] [Queue Management](https://tembo-io.github.io/pgmq/api/sql/functions/#queue-management)
- [x] [create](https://tembo-io.github.io/pgmq/api/sql/functions/#create)
- [ ] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned)
- [x] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned) see `Partition`
- [x] [create_unlogged](https://tembo-io.github.io/pgmq/api/sql/functions/#create_unlogged)
- [x] [detach_archive](https://tembo-io.github.io/pgmq/api/sql/functions/#detach_archive)
- [x] [drop_queue](https://tembo-io.github.io/pgmq/api/sql/functions/#drop_queue)
Expand All @@ -153,6 +153,10 @@ Open url `http://127.0.0.1:7001/swagger-ui/index.html` with browser
- [x] [list_queues](https://tembo-io.github.io/pgmq/api/sql/functions/#list_queues)
- [x] [metrics](https://tembo-io.github.io/pgmq/api/sql/functions/#metrics)
- [x] [metrics_all](https://tembo-io.github.io/pgmq/api/sql/functions/#metrics_all)
- Partition
- [x] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned)
- [x] [show_partitions](https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#show_partitions)
- [x] [run_maintenance](https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#run_maintenance)


## License
Expand Down
17 changes: 14 additions & 3 deletions packages/pgmq-js/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,20 @@ Start a Postgres instance with the PGMQ extension installed:
docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg17-pgmq:latest
```

Create the pgmq extension
Create the pgmq extension and necessary tables
```sh
psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -bq \
-f database/default/ddl/extension.sql \
-f database/default/ddl/tb_queue_meta.sql \
-f database/default/ddl/tb_route.sql
```

Enable [maintenance] on the database(s)
```sql
ALTER SYSTEM SET pg_partman_bgw.dbname = 'postgres,db_ci_test';
SELECT pg_reload_conf();
```

## Usage

### init
Expand Down Expand Up @@ -165,9 +171,9 @@ interface SendRouteMsgResultItem {
- [x] [purge_queue](https://tembo-io.github.io/pgmq/api/sql/functions/#purge_queue)
- [x] [archive (single)](https://tembo-io.github.io/pgmq/api/sql/functions/#archive-single)
- [x] [archive (batch)](https://tembo-io.github.io/pgmq/api/sql/functions/#archive-batch)
- [ ] [Queue Management](https://tembo-io.github.io/pgmq/api/sql/functions/#queue-management)
- [x] [Queue Management](https://tembo-io.github.io/pgmq/api/sql/functions/#queue-management)
- [x] [create](https://tembo-io.github.io/pgmq/api/sql/functions/#create)
- [ ] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned)
- [x] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned) see `Partition`
- [x] [create_unlogged](https://tembo-io.github.io/pgmq/api/sql/functions/#create_unlogged)
- [x] [detach_archive](https://tembo-io.github.io/pgmq/api/sql/functions/#detach_archive)
- [x] [drop_queue](https://tembo-io.github.io/pgmq/api/sql/functions/#drop_queue)
Expand All @@ -176,6 +182,10 @@ interface SendRouteMsgResultItem {
- [x] [list_queues](https://tembo-io.github.io/pgmq/api/sql/functions/#list_queues)
- [x] [metrics](https://tembo-io.github.io/pgmq/api/sql/functions/#metrics)
- [x] [metrics_all](https://tembo-io.github.io/pgmq/api/sql/functions/#metrics_all)
- Partition
- [x] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned)
- [x] [show_partitions](https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#show_partitions)
- [x] [run_maintenance](https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#run_maintenance)


## License
Expand All @@ -197,3 +207,4 @@ interface SendRouteMsgResultItem {
[cli-ch]: https://github.com/waitingsong/pgmq-js/tree/main/packages/mwcp-pgmq-js/CHANGELOG.md

[PGMQ]: https://tembo-io.github.io/pgmq/
[maintenance]: https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#run_maintenance
5 changes: 4 additions & 1 deletion packages/pgmq-js/database/default/ddl/extension.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@

-- create the extension in the "pgmq" schema
-- create the extension
CREATE EXTENSION pgmq;
CREATE EXTENSION pg_partman;

-- CREATE EXTENSION dblink;
-- CREATE EXTENSION pg_jobmon;

1 change: 1 addition & 0 deletions packages/pgmq-js/src/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export * from './queue-meta-manager/index.queue-meta.js'
export * from './msg-manager/index.msg.js'
export * from './router/index.router.js'
export * from './route-msg/index.route-msg.js'
export * from './partition/index.part.js'
export type { Transaction } from './knex.types.js'
export { genRandomName } from './helper.js'

10 changes: 10 additions & 0 deletions packages/pgmq-js/src/lib/partition/db.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/* c8 ignore start */


export interface ShowPartitionsRecord {
partition_schemaname: string
partition_tablename: string
}


/* c8 ignore stop */
3 changes: 3 additions & 0 deletions packages/pgmq-js/src/lib/partition/index.part.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

export * from './part.js'
export * from './part.types.js'
12 changes: 12 additions & 0 deletions packages/pgmq-js/src/lib/partition/part.helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { ShowPartitionsRecord } from './db.types.js'
import type { ShowPartitions } from './part.types.js'


export function parseShowPartitionRecord(input: ShowPartitionsRecord): ShowPartitions {
const ret: ShowPartitions = {
partitionSchemaname: input.partition_schemaname,
partitionTablename: input.partition_tablename,
}
return ret
}

13 changes: 13 additions & 0 deletions packages/pgmq-js/src/lib/partition/part.sql.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

export enum PartSql {
/**
* @link https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#show_partitions
*/
showPartitions = 'SELECT * FROM show_partitions(?::text, ?, ?)',
/**
* @link https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned
*/
createPartitioned = 'SELECT pgmq.create_partitioned(?, ?, ?)',
runMaintenance = 'SELECT run_maintenance()',
runMaintenance2 = 'SELECT run_maintenance(?::text, ?, ?)',
}
118 changes: 118 additions & 0 deletions packages/pgmq-js/src/lib/partition/part.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import assert from 'node:assert'

import type { Knex, QueryResponse, Transaction } from '../knex.types.js'
import type { QueueMetaManager } from '../queue-meta-manager/index.queue-meta.js'

import type { ShowPartitionsRecord } from './db.types.js'
import { parseShowPartitionRecord } from './part.helpers.js'
import { PartSql } from './part.sql.js'
import type { CreatePartitionedQueueMetaOptions, RunMaintenanceOptions, ShowPartitions, ShowPartitionsOptions } from './part.types.js'


export class Partition {

constructor(
protected readonly dbh: Knex,
protected readonly queueMeta: QueueMetaManager,
) { }

/**
* @link https://tembo-io.github.io/pgmq/#partitioned-queues
*/
async createPartitioned(options: CreatePartitionedQueueMetaOptions): Promise<string> {
const opts: CreatePartitionedQueueMetaOptions = {
...options,
trx: options.trx ?? await this.startTransaction(),
queue: options.queue.toLowerCase(),
}
const { trx } = opts
assert(trx, 'Transaction is required')

await this._createPartitioned(opts)
const queueId = await this.queueMeta.create(opts)

if (! options.trx) {
await trx.commit()
}
return queueId
}


// #region showPartitions()

/**
* show partitions of a partitioned table
* @CAUTION queue must prefix with schema name, like 'public.my_queue'
* @description will throw error if
* - Given parent table not managed by pg_partman
*/
async showPartitions(options: ShowPartitionsOptions): Promise<ShowPartitions[]> {
const list = await this._showPartitions(options)
return list
}

// #region run_maintenance()

async runMaintenance(options?: RunMaintenanceOptions): Promise<void> {
const name = options?.queue?.toLowerCase() ?? null
const analyze = options?.analyze ?? false
const jobmon = options?.jobmon ?? true

const sql = name ? PartSql.runMaintenance2 : PartSql.runMaintenance
const data = name ? [name, analyze, jobmon] : null
await this.execute(sql, data, null)
}



private async _showPartitions(options: ShowPartitionsOptions): Promise<ShowPartitions[]> {
const { trx } = options
const name = options.queue.toLowerCase()
const ord = options.order ?? 'ASC'
const includeDefault = options.includeDefault ?? false

const sql = PartSql.showPartitions
const res = await this.execute<QueryResponse<ShowPartitionsRecord>>(sql, [name, ord, includeDefault], trx)

const ret = res.rows.map(parseShowPartitionRecord)
return ret
}


protected async _createPartitioned(options: CreatePartitionedQueueMetaOptions): Promise<void> {
const { queue, partitionInterval, retentionInterval, trx } = options
const sql = PartSql.createPartitioned
const args = [
queue,
partitionInterval ?? '1 month',
retentionInterval ?? '1 year',
]
await this.execute(sql, args, trx)
}


// #region common ------------------------------

protected async execute<T = unknown>(sql: string, params: unknown[] | null, trx: Transaction | undefined | null): Promise<T> {
if (trx) {
assert(! trx.isCompleted(), 'parameter trx is completed already')
}
const dbh = trx ?? this.dbh
try {
const res = await (params ? dbh.raw(sql, params) : dbh.raw(sql)) as T
return res
}
catch (ex) {
await trx?.rollback()
throw ex
}
}

protected async startTransaction(): Promise<Transaction> {
const ret = await this.dbh.transaction()
assert(ret, 'Transaction is required')
return ret
}

}

72 changes: 72 additions & 0 deletions packages/pgmq-js/src/lib/partition/part.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/* c8 ignore start */

import type { QueueOptionsBase } from '../types.js'


export interface ShowPartitions {
partitionSchemaname: string
partitionTablename: string
}

/**
* @link https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#show_partitions
*/
export interface ShowPartitionsOptions extends QueueOptionsBase {
/**
* @default 'ASC'
*/
order?: 'ASC' | 'DESC'
/**
* @default false
*/
includeDefault?: boolean
}

export enum PartMsg {
queueNotManaged = 'Given parent table not managed by pg_partman',
}


/**
* @link https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned
* @link https://tembo-io.github.io/pgmq/#partitioned-queues
*/
export interface CreatePartitionedQueueMetaOptions extends QueueOptionsBase {
/**
* Pgsql Duration (eg. '1 days') or number string (eg. '100000')
* @default '1 month'
*/
partitionInterval?: string
/**
* Pgsql Duration (eg. '1 year') or number string (eg. '10000000')
* @default '1 year'
*/
retentionInterval?: string
/**
* Maximum 512 characters
*/
queueKey?: string | null
json?: object | null
}


/**
* @link https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#run_maintenance
*/
export interface RunMaintenanceOptions {
/**
* Must prefix with schema name, like 'public.my_queue'
*/
queue?: string
/**
* @default false
*/
analyze?: boolean
/**
* @default true
*/
jobmon?: boolean
}


/* c8 ignore stop */
Loading

0 comments on commit b50dc6d

Please sign in to comment.