diff --git a/.github/workflows/nodejs.yml b/.github/workflows/nodejs.yml index a0b5083..db4d62a 100644 --- a/.github/workflows/nodejs.yml +++ b/.github/workflows/nodejs.yml @@ -65,10 +65,7 @@ jobs: run: | psql -V dig postgres - echo "\l" | psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d postgres psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -c "SELECT version();" - psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -c "\d+" - psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -c "SHOW TIMEZONE;" env: CI: true PGMQ_HOST: postgres diff --git a/.scripts/ci/ci-init-db.sh b/.scripts/ci/ci-init-db.sh index 789f9b3..1663cec 100755 --- a/.scripts/ci/ci-init-db.sh +++ b/.scripts/ci/ci-init-db.sh @@ -7,7 +7,6 @@ dig postgres PGPASSWORD="$PGMQ_PASSWORD" psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -c "SHOW TIMEZONE;" -psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -c "SELECT extname, extversion FROM pg_extension;" echo -e "\n" @@ -16,5 +15,8 @@ SQL_DIR="$cwd/packages/pgmq-js/database/" cd "$SQL_DIR" . ./init-db.sh +psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -c "SELECT extname, extversion FROM pg_extension;" +echo "\l" | psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d postgres +psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -c "\d+" cd "$cwd" diff --git a/README.md b/README.md index 24b6683..96372cd 100644 --- a/README.md +++ b/README.md @@ -72,9 +72,9 @@ psql -h $POSTGRES_HOST -p $POSTGRES_PORT -U$POSTGRES_USER -d $POSTGRES_DB -bq \ - [x] [purge_queue](https://tembo-io.github.io/pgmq/api/sql/functions/#purge_queue) - [x] [archive (single)](https://tembo-io.github.io/pgmq/api/sql/functions/#archive-single) - [x] [archive (batch)](https://tembo-io.github.io/pgmq/api/sql/functions/#archive-batch) -- [ ] [Queue Management](https://tembo-io.github.io/pgmq/api/sql/functions/#queue-management) +- [x] [Queue Management](https://tembo-io.github.io/pgmq/api/sql/functions/#queue-management) - [x] [create](https://tembo-io.github.io/pgmq/api/sql/functions/#create) - - [ ] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned) + - [x] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned) see `Partition` - [x] [create_unlogged](https://tembo-io.github.io/pgmq/api/sql/functions/#create_unlogged) - [x] [detach_archive](https://tembo-io.github.io/pgmq/api/sql/functions/#detach_archive) - [x] [drop_queue](https://tembo-io.github.io/pgmq/api/sql/functions/#drop_queue) @@ -83,6 +83,10 @@ psql -h $POSTGRES_HOST -p $POSTGRES_PORT -U$POSTGRES_USER -d $POSTGRES_DB -bq \ - [x] [list_queues](https://tembo-io.github.io/pgmq/api/sql/functions/#list_queues) - [x] [metrics](https://tembo-io.github.io/pgmq/api/sql/functions/#metrics) - [x] [metrics_all](https://tembo-io.github.io/pgmq/api/sql/functions/#metrics_all) +- Partition + - [x] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned) + - [x] [show_partitions](https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#show_partitions) + - [x] [run_maintenance](https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#run_maintenance) diff --git a/packages/mwcp-pgmq-js/README.md b/packages/mwcp-pgmq-js/README.md index c677f8a..e30b020 100644 --- a/packages/mwcp-pgmq-js/README.md +++ b/packages/mwcp-pgmq-js/README.md @@ -142,9 +142,9 @@ Open url `http://127.0.0.1:7001/swagger-ui/index.html` with browser - [x] [purge_queue](https://tembo-io.github.io/pgmq/api/sql/functions/#purge_queue) - [x] [archive (single)](https://tembo-io.github.io/pgmq/api/sql/functions/#archive-single) - [x] [archive (batch)](https://tembo-io.github.io/pgmq/api/sql/functions/#archive-batch) -- [ ] [Queue Management](https://tembo-io.github.io/pgmq/api/sql/functions/#queue-management) +- [x] [Queue Management](https://tembo-io.github.io/pgmq/api/sql/functions/#queue-management) - [x] [create](https://tembo-io.github.io/pgmq/api/sql/functions/#create) - - [ ] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned) + - [x] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned) see `Partition` - [x] [create_unlogged](https://tembo-io.github.io/pgmq/api/sql/functions/#create_unlogged) - [x] [detach_archive](https://tembo-io.github.io/pgmq/api/sql/functions/#detach_archive) - [x] [drop_queue](https://tembo-io.github.io/pgmq/api/sql/functions/#drop_queue) @@ -153,6 +153,10 @@ Open url `http://127.0.0.1:7001/swagger-ui/index.html` with browser - [x] [list_queues](https://tembo-io.github.io/pgmq/api/sql/functions/#list_queues) - [x] [metrics](https://tembo-io.github.io/pgmq/api/sql/functions/#metrics) - [x] [metrics_all](https://tembo-io.github.io/pgmq/api/sql/functions/#metrics_all) +- Partition + - [x] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned) + - [x] [show_partitions](https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#show_partitions) + - [x] [run_maintenance](https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#run_maintenance) ## License diff --git a/packages/pgmq-js/README.md b/packages/pgmq-js/README.md index 1a280a4..d80cc64 100644 --- a/packages/pgmq-js/README.md +++ b/packages/pgmq-js/README.md @@ -30,7 +30,7 @@ Start a Postgres instance with the PGMQ extension installed: docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg17-pgmq:latest ``` -Create the pgmq extension +Create the pgmq extension and necessary tables ```sh psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -bq \ -f database/default/ddl/extension.sql \ @@ -38,6 +38,12 @@ psql -h $PGMQ_HOST -p $PGMQ_PORT -U$PGMQ_USER -d $PGMQ_DB -bq \ -f database/default/ddl/tb_route.sql ``` +Enable [maintenance] on the database(s) +```sql +ALTER SYSTEM SET pg_partman_bgw.dbname = 'postgres,db_ci_test'; +SELECT pg_reload_conf(); +``` + ## Usage ### init @@ -165,9 +171,9 @@ interface SendRouteMsgResultItem { - [x] [purge_queue](https://tembo-io.github.io/pgmq/api/sql/functions/#purge_queue) - [x] [archive (single)](https://tembo-io.github.io/pgmq/api/sql/functions/#archive-single) - [x] [archive (batch)](https://tembo-io.github.io/pgmq/api/sql/functions/#archive-batch) -- [ ] [Queue Management](https://tembo-io.github.io/pgmq/api/sql/functions/#queue-management) +- [x] [Queue Management](https://tembo-io.github.io/pgmq/api/sql/functions/#queue-management) - [x] [create](https://tembo-io.github.io/pgmq/api/sql/functions/#create) - - [ ] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned) + - [x] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned) see `Partition` - [x] [create_unlogged](https://tembo-io.github.io/pgmq/api/sql/functions/#create_unlogged) - [x] [detach_archive](https://tembo-io.github.io/pgmq/api/sql/functions/#detach_archive) - [x] [drop_queue](https://tembo-io.github.io/pgmq/api/sql/functions/#drop_queue) @@ -176,6 +182,10 @@ interface SendRouteMsgResultItem { - [x] [list_queues](https://tembo-io.github.io/pgmq/api/sql/functions/#list_queues) - [x] [metrics](https://tembo-io.github.io/pgmq/api/sql/functions/#metrics) - [x] [metrics_all](https://tembo-io.github.io/pgmq/api/sql/functions/#metrics_all) +- Partition + - [x] [create_partitioned](https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned) + - [x] [show_partitions](https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#show_partitions) + - [x] [run_maintenance](https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#run_maintenance) ## License @@ -197,3 +207,4 @@ interface SendRouteMsgResultItem { [cli-ch]: https://github.com/waitingsong/pgmq-js/tree/main/packages/mwcp-pgmq-js/CHANGELOG.md [PGMQ]: https://tembo-io.github.io/pgmq/ +[maintenance]: https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#run_maintenance diff --git a/packages/pgmq-js/database/default/ddl/extension.sql b/packages/pgmq-js/database/default/ddl/extension.sql index 17b6734..bfc5855 100644 --- a/packages/pgmq-js/database/default/ddl/extension.sql +++ b/packages/pgmq-js/database/default/ddl/extension.sql @@ -1,5 +1,8 @@ --- create the extension in the "pgmq" schema +-- create the extension CREATE EXTENSION pgmq; CREATE EXTENSION pg_partman; +-- CREATE EXTENSION dblink; +-- CREATE EXTENSION pg_jobmon; + diff --git a/packages/pgmq-js/src/lib/index.ts b/packages/pgmq-js/src/lib/index.ts index 07d5b81..d92aa46 100644 --- a/packages/pgmq-js/src/lib/index.ts +++ b/packages/pgmq-js/src/lib/index.ts @@ -7,6 +7,7 @@ export * from './queue-meta-manager/index.queue-meta.js' export * from './msg-manager/index.msg.js' export * from './router/index.router.js' export * from './route-msg/index.route-msg.js' +export * from './partition/index.part.js' export type { Transaction } from './knex.types.js' export { genRandomName } from './helper.js' diff --git a/packages/pgmq-js/src/lib/partition/db.types.ts b/packages/pgmq-js/src/lib/partition/db.types.ts new file mode 100644 index 0000000..c9ff257 --- /dev/null +++ b/packages/pgmq-js/src/lib/partition/db.types.ts @@ -0,0 +1,10 @@ +/* c8 ignore start */ + + +export interface ShowPartitionsRecord { + partition_schemaname: string + partition_tablename: string +} + + +/* c8 ignore stop */ diff --git a/packages/pgmq-js/src/lib/partition/index.part.ts b/packages/pgmq-js/src/lib/partition/index.part.ts new file mode 100644 index 0000000..830d630 --- /dev/null +++ b/packages/pgmq-js/src/lib/partition/index.part.ts @@ -0,0 +1,3 @@ + +export * from './part.js' +export * from './part.types.js' diff --git a/packages/pgmq-js/src/lib/partition/part.helpers.ts b/packages/pgmq-js/src/lib/partition/part.helpers.ts new file mode 100644 index 0000000..909b5b1 --- /dev/null +++ b/packages/pgmq-js/src/lib/partition/part.helpers.ts @@ -0,0 +1,12 @@ +import type { ShowPartitionsRecord } from './db.types.js' +import type { ShowPartitions } from './part.types.js' + + +export function parseShowPartitionRecord(input: ShowPartitionsRecord): ShowPartitions { + const ret: ShowPartitions = { + partitionSchemaname: input.partition_schemaname, + partitionTablename: input.partition_tablename, + } + return ret +} + diff --git a/packages/pgmq-js/src/lib/partition/part.sql.ts b/packages/pgmq-js/src/lib/partition/part.sql.ts new file mode 100644 index 0000000..399298a --- /dev/null +++ b/packages/pgmq-js/src/lib/partition/part.sql.ts @@ -0,0 +1,13 @@ + +export enum PartSql { + /** + * @link https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#show_partitions + */ + showPartitions = 'SELECT * FROM show_partitions(?::text, ?, ?)', + /** + * @link https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned + */ + createPartitioned = 'SELECT pgmq.create_partitioned(?, ?, ?)', + runMaintenance = 'SELECT run_maintenance()', + runMaintenance2 = 'SELECT run_maintenance(?::text, ?, ?)', +} diff --git a/packages/pgmq-js/src/lib/partition/part.ts b/packages/pgmq-js/src/lib/partition/part.ts new file mode 100644 index 0000000..1121f40 --- /dev/null +++ b/packages/pgmq-js/src/lib/partition/part.ts @@ -0,0 +1,118 @@ +import assert from 'node:assert' + +import type { Knex, QueryResponse, Transaction } from '../knex.types.js' +import type { QueueMetaManager } from '../queue-meta-manager/index.queue-meta.js' + +import type { ShowPartitionsRecord } from './db.types.js' +import { parseShowPartitionRecord } from './part.helpers.js' +import { PartSql } from './part.sql.js' +import type { CreatePartitionedQueueMetaOptions, RunMaintenanceOptions, ShowPartitions, ShowPartitionsOptions } from './part.types.js' + + +export class Partition { + + constructor( + protected readonly dbh: Knex, + protected readonly queueMeta: QueueMetaManager, + ) { } + + /** + * @link https://tembo-io.github.io/pgmq/#partitioned-queues + */ + async createPartitioned(options: CreatePartitionedQueueMetaOptions): Promise { + const opts: CreatePartitionedQueueMetaOptions = { + ...options, + trx: options.trx ?? await this.startTransaction(), + queue: options.queue.toLowerCase(), + } + const { trx } = opts + assert(trx, 'Transaction is required') + + await this._createPartitioned(opts) + const queueId = await this.queueMeta.create(opts) + + if (! options.trx) { + await trx.commit() + } + return queueId + } + + + // #region showPartitions() + + /** + * show partitions of a partitioned table + * @CAUTION queue must prefix with schema name, like 'public.my_queue' + * @description will throw error if + * - Given parent table not managed by pg_partman + */ + async showPartitions(options: ShowPartitionsOptions): Promise { + const list = await this._showPartitions(options) + return list + } + + // #region run_maintenance() + + async runMaintenance(options?: RunMaintenanceOptions): Promise { + const name = options?.queue?.toLowerCase() ?? null + const analyze = options?.analyze ?? false + const jobmon = options?.jobmon ?? true + + const sql = name ? PartSql.runMaintenance2 : PartSql.runMaintenance + const data = name ? [name, analyze, jobmon] : null + await this.execute(sql, data, null) + } + + + + private async _showPartitions(options: ShowPartitionsOptions): Promise { + const { trx } = options + const name = options.queue.toLowerCase() + const ord = options.order ?? 'ASC' + const includeDefault = options.includeDefault ?? false + + const sql = PartSql.showPartitions + const res = await this.execute>(sql, [name, ord, includeDefault], trx) + + const ret = res.rows.map(parseShowPartitionRecord) + return ret + } + + + protected async _createPartitioned(options: CreatePartitionedQueueMetaOptions): Promise { + const { queue, partitionInterval, retentionInterval, trx } = options + const sql = PartSql.createPartitioned + const args = [ + queue, + partitionInterval ?? '1 month', + retentionInterval ?? '1 year', + ] + await this.execute(sql, args, trx) + } + + + // #region common ------------------------------ + + protected async execute(sql: string, params: unknown[] | null, trx: Transaction | undefined | null): Promise { + if (trx) { + assert(! trx.isCompleted(), 'parameter trx is completed already') + } + const dbh = trx ?? this.dbh + try { + const res = await (params ? dbh.raw(sql, params) : dbh.raw(sql)) as T + return res + } + catch (ex) { + await trx?.rollback() + throw ex + } + } + + protected async startTransaction(): Promise { + const ret = await this.dbh.transaction() + assert(ret, 'Transaction is required') + return ret + } + +} + diff --git a/packages/pgmq-js/src/lib/partition/part.types.ts b/packages/pgmq-js/src/lib/partition/part.types.ts new file mode 100644 index 0000000..ef5cbd6 --- /dev/null +++ b/packages/pgmq-js/src/lib/partition/part.types.ts @@ -0,0 +1,72 @@ +/* c8 ignore start */ + +import type { QueueOptionsBase } from '../types.js' + + +export interface ShowPartitions { + partitionSchemaname: string + partitionTablename: string +} + +/** + * @link https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#show_partitions + */ +export interface ShowPartitionsOptions extends QueueOptionsBase { + /** + * @default 'ASC' + */ + order?: 'ASC' | 'DESC' + /** + * @default false + */ + includeDefault?: boolean +} + +export enum PartMsg { + queueNotManaged = 'Given parent table not managed by pg_partman', +} + + +/** + * @link https://tembo-io.github.io/pgmq/api/sql/functions/#create_partitioned + * @link https://tembo-io.github.io/pgmq/#partitioned-queues + */ +export interface CreatePartitionedQueueMetaOptions extends QueueOptionsBase { + /** + * Pgsql Duration (eg. '1 days') or number string (eg. '100000') + * @default '1 month' + */ + partitionInterval?: string + /** + * Pgsql Duration (eg. '1 year') or number string (eg. '10000000') + * @default '1 year' + */ + retentionInterval?: string + /** + * Maximum 512 characters + */ + queueKey?: string | null + json?: object | null +} + + +/** + * @link https://github.com/pgpartman/pg_partman/blob/development/doc/pg_partman.md#run_maintenance + */ +export interface RunMaintenanceOptions { + /** + * Must prefix with schema name, like 'public.my_queue' + */ + queue?: string + /** + * @default false + */ + analyze?: boolean + /** + * @default true + */ + jobmon?: boolean +} + + +/* c8 ignore stop */ diff --git a/packages/pgmq-js/src/lib/pgmq.ts b/packages/pgmq-js/src/lib/pgmq.ts index 2fb4e2b..6a0e3bd 100644 --- a/packages/pgmq-js/src/lib/pgmq.ts +++ b/packages/pgmq-js/src/lib/pgmq.ts @@ -5,6 +5,7 @@ import { initDbConfigPart, initDbConnectionConfig } from './config.js' import { type RespCommon, parseRespCommon } from './helper.js' import type { Transaction } from './knex.types.js' import { type MsgContent, type MsgId, type SendOptions, MsgManager } from './msg-manager/index.msg.js' +import { Partition } from './partition/index.part.js' import { QueueManager } from './queue-manager/index.queue.js' import { QueueMetaManager } from './queue-meta-manager/index.queue-meta.js' import { type SendRouteMsgOptions, type SendRouteMsgResultItem, RouteMsg } from './route-msg/index.route-msg.js' @@ -18,7 +19,8 @@ export class Pgmq { public readonly msg: MsgManager public readonly router: Router public readonly routeMsg: RouteMsg - protected readonly dbh: Knex + public readonly partition: Partition + public readonly dbh: Knex protected readonly dbConfig: DbConfig constructor( @@ -29,6 +31,7 @@ export class Pgmq { this.dbh = createDbh(this.dbConfig) this.queueMeta = new QueueMetaManager(this.dbh) + this.partition = new Partition(this.dbh, this.queueMeta) this.queue = new QueueManager(this.dbh, this.queueMeta) this.msg = new MsgManager(this.dbh, this.queue) this.router = new Router(this.dbh, this.queueMeta) diff --git a/packages/pgmq-js/test/lib/30.queue-manager/35.queue.detachArchive.test.ts b/packages/pgmq-js/test/lib/30.queue-manager/35.queue.detachArchive.test.ts index 41872f9..58e50ea 100644 --- a/packages/pgmq-js/test/lib/30.queue-manager/35.queue.detachArchive.test.ts +++ b/packages/pgmq-js/test/lib/30.queue-manager/35.queue.detachArchive.test.ts @@ -26,7 +26,6 @@ describe(fileShortPath(import.meta.url), () => { }) after(async () => { // await mq.queue.drop(createOpts) // queue will not be dropped case of archived - // @ts-expect-error await mq.dbh.raw(`DROP TABLE IF EXISTS pgmq.a_${createOpts.queue}`) // await mq.dbh.raw(`DROP TABLE IF EXISTS pgmq.q_${createOpts.queue}`) // queue will not be dropped case of archived await mq.destroy() diff --git a/packages/pgmq-js/test/lib/300.partition/300.partition.createPartitioned.test.ts b/packages/pgmq-js/test/lib/300.partition/300.partition.createPartitioned.test.ts new file mode 100644 index 0000000..122195e --- /dev/null +++ b/packages/pgmq-js/test/lib/300.partition/300.partition.createPartitioned.test.ts @@ -0,0 +1,34 @@ +import assert from 'node:assert' + +import { fileShortPath, sleep } from '@waiting/shared-core' + +import { type CreatePartitionedQueueMetaOptions, Pgmq, genRandomName } from '##/index.js' +import { dbConfig } from '#@/config.unittest.js' + + +const rndString = genRandomName(6) +// const rndString = 'abc0ba' + +describe(fileShortPath(import.meta.url), () => { + let mq: Pgmq + const createOpts: CreatePartitionedQueueMetaOptions = { + queue: rndString, + partitionInterval: '1min', + retentionInterval: '2mins', + } + + before(async () => { + mq = new Pgmq('test', dbConfig) + }) + after(async () => { + await mq.queue.drop(createOpts) + await mq.destroy() + }) + + describe(`Partition.createPartitioned(${rndString})`, () => { + it(`normal`, async () => { + await mq.partition.createPartitioned(createOpts) + }) + }) +}) + diff --git a/packages/pgmq-js/test/lib/300.partition/301.partition.runMaintenance.test.ts b/packages/pgmq-js/test/lib/300.partition/301.partition.runMaintenance.test.ts new file mode 100644 index 0000000..30470c3 --- /dev/null +++ b/packages/pgmq-js/test/lib/300.partition/301.partition.runMaintenance.test.ts @@ -0,0 +1,78 @@ +import assert from 'node:assert' + +import { fileShortPath, sleep } from '@waiting/shared-core' + +import { + type CreatePartitionedQueueMetaOptions, + type RunMaintenanceOptions, + type ShowPartitionsOptions, + Pgmq, genRandomName, +} from '##/index.js' +import { dbConfig } from '#@/config.unittest.js' + + +describe(fileShortPath(import.meta.url), () => { + let mq: Pgmq + const rndString = genRandomName(6) + const createOpts: CreatePartitionedQueueMetaOptions = { + queue: rndString, + } + const options: ShowPartitionsOptions = { queue: rndString } + + before(async () => { + mq = new Pgmq('test', dbConfig) + const sql = 'SELECT extname, extversion FROM pg_extension;' + const resp = await mq.dbh.raw(sql) as unknown + // @ts-ignore + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + console.log({ resp: resp.rows }) + + const sql2 = 'SELECT current_schema();' + const resp2 = await mq.dbh.raw(sql2) as unknown + // @ts-ignore + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + console.log({ resp2: resp2.rows }) + + const sql3 = `SELECT proname, nspname +FROM pg_catalog.pg_proc +JOIN pg_catalog.pg_namespace ON pg_proc.pronamespace = pg_namespace.oid +WHERE proname = 'run_maintenance'` + + const resp3 = await mq.dbh.raw(sql3) as unknown + // @ts-ignore + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + console.log({ resp3: resp3.rows }) + + // const path = 'SET search_path TO "$user", public, pgmq' + // await mq.dbh.raw(path) + + const sql4 = 'SHOW search_path' + const resp4 = await mq.dbh.raw(sql4) as unknown + // @ts-ignore + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + console.log({ resp4: resp4.rows }) + + // const sql5 = 'SELECT run_maintenance()' + // await mq.dbh.raw(sql5) + + await mq.partition.createPartitioned(createOpts) + }) + after(async () => { + await mq.queue.drop(createOpts) + await mq.destroy() + }) + + describe(`Partition`, () => { + + it(`runMaintenance()`, async () => { + await mq.partition.runMaintenance() + }) + + it(`runMaintenance(pgmq.q_${rndString})`, async () => { + const opts: RunMaintenanceOptions = { queue: `pgmq.q_${rndString}` } + await mq.partition.runMaintenance(opts) + }) + + }) +}) + diff --git a/packages/pgmq-js/test/lib/300.partition/303.partition.showPartitions.test.ts b/packages/pgmq-js/test/lib/300.partition/303.partition.showPartitions.test.ts new file mode 100644 index 0000000..9e05858 --- /dev/null +++ b/packages/pgmq-js/test/lib/300.partition/303.partition.showPartitions.test.ts @@ -0,0 +1,72 @@ +import assert from 'node:assert' + +import { fileShortPath, sleep } from '@waiting/shared-core' + +import { type CreatePartitionedQueueMetaOptions, type ShowPartitionsOptions, PartMsg, Pgmq, genRandomName } from '##/index.js' +import { dbConfig } from '#@/config.unittest.js' + + +describe(fileShortPath(import.meta.url), () => { + let mq: Pgmq + const rndString = genRandomName(6) + // const rndString = 'abc0ba' + const retention = 2 + const createOpts: CreatePartitionedQueueMetaOptions = { + queue: rndString, + partitionInterval: '1 month', + retentionInterval: `${retention} months`, + } + const options: ShowPartitionsOptions = { queue: rndString } + + before(async () => { + mq = new Pgmq('test', dbConfig) + }) + after(async () => { + await mq.destroy() + }) + + describe(`Partition`, () => { + + it(`showPartitions(fakeQueueName) should throw "${PartMsg.queueNotManaged}" with fake name`, async () => { + const opts: ShowPartitionsOptions = { queue: 'FAKE' } + try { + await mq.partition.showPartitions(opts) + } + catch (ex) { + assert(ex instanceof Error) + assert(ex.message.includes(PartMsg.queueNotManaged), ex.message) + assert(ex.message.includes(opts.queue.toLocaleLowerCase()), ex.message) + await mq.queue.drop(options) + return + } + assert(false, 'should throw') + }) + + it(`showPartitions(q_${rndString}) should throw "${PartMsg.queueNotManaged}" with queue not partitioned`, async () => { + await mq.queue.create(options) + const opts: ShowPartitionsOptions = { queue: `pgmq.q_${rndString}` } + try { + await mq.partition.showPartitions(opts) + } + catch (ex) { + assert(ex instanceof Error) + assert(ex.message.includes(PartMsg.queueNotManaged), ex.message) + assert(ex.message.includes(opts.queue.toLocaleLowerCase()), ex.message) + await mq.queue.drop(options) + return + } + assert(false, 'should throw') + }) + + it(`showPartitions(q_${rndString}) should return array with queue partitioned`, async () => { + await mq.partition.createPartitioned(createOpts) + const opts: ShowPartitionsOptions = { queue: `pgmq.q_${rndString}` } + const info = await mq.partition.showPartitions(opts) + assert(info.length > 0, 'should return partitions') + console.log({ info }) + await mq.queue.drop(options) + }) + + }) +}) + diff --git a/packages/pgmq-js/test/lib/300.partition/304.partition.showPartitions.maintaince.test.ts b/packages/pgmq-js/test/lib/300.partition/304.partition.showPartitions.maintaince.test.ts new file mode 100644 index 0000000..66b1d93 --- /dev/null +++ b/packages/pgmq-js/test/lib/300.partition/304.partition.showPartitions.maintaince.test.ts @@ -0,0 +1,67 @@ +import assert from 'node:assert' + +import { fileShortPath, sleep } from '@waiting/shared-core' + +import { type CreatePartitionedQueueMetaOptions, type ShowPartitionsOptions, PartMsg, Pgmq, genRandomName } from '##/index.js' +import { dbConfig } from '#@/config.unittest.js' + +/* RUN: +-- database/default/ddl/ci-config.sql +ALTER SYSTEM SET pg_partman_bgw.interval = 2 +SELECT pg_reload_conf(); +*/ + +describe(fileShortPath(import.meta.url), () => { + let mq: Pgmq + const rndString = genRandomName(6) + // const rndString = 'abc0ba' + const retention = 2 + const createOpts: CreatePartitionedQueueMetaOptions = { + queue: rndString, + partitionInterval: '1 month', + retentionInterval: `${retention} months`, + } + const options: ShowPartitionsOptions = { queue: rndString } + + before(async () => { + mq = new Pgmq('test', dbConfig) + }) + after(async () => { + await mq.destroy() + }) + + describe(`Partition`, () => { + it(`showPartitions(q_${rndString}) should return array with queue partitioned after runMaintenance with name`, async () => { + await mq.partition.createPartitioned(createOpts) + const opts: ShowPartitionsOptions = { queue: `pgmq.q_${rndString}` } + const info = await mq.partition.showPartitions(opts) + assert(info.length === 9, 'should return partitions') + + await mq.partition.runMaintenance({ queue: `pgmq.q_${rndString}` }) + await sleep(2000) + const info2 = await mq.partition.showPartitions(opts) + assert(info2.length === 7, 'should partitions maintained') + console.log({ info2 }) + + await mq.queue.drop(options) + }) + + it(`showPartitions(q_${rndString}) should return array with queue partitioned after runMaintenance`, async () => { + await mq.partition.createPartitioned(createOpts) + const opts: ShowPartitionsOptions = { queue: `pgmq.q_${rndString}` } + const info = await mq.partition.showPartitions(opts) + assert(info.length === 9, 'should return partitions') + + await mq.partition.runMaintenance() + await sleep(2000) + const info2 = await mq.partition.showPartitions(opts) + assert(info2.length === 7, 'should partitions maintained') + console.log({ info2 }) + + await mq.queue.drop(options) + }) + + }) + +}) + diff --git a/packages/pgmq-js/test/lib/50.msg-manager/61.msg.read_with_poll.test.ts b/packages/pgmq-js/test/lib/50.msg-manager/61.msg.read_with_poll.test.ts index 74722bd..9bb3f9f 100644 --- a/packages/pgmq-js/test/lib/50.msg-manager/61.msg.read_with_poll.test.ts +++ b/packages/pgmq-js/test/lib/50.msg-manager/61.msg.read_with_poll.test.ts @@ -50,7 +50,7 @@ describe(fileShortPath(import.meta.url), () => { ]) const cost = Date.now() - now console.info('cost:', cost) - assert(cost > 3000, `cost: ${cost}`) + assert(cost >= 3000, `cost: ${cost}`) assert(cost < 3200, `cost: ${cost}`) }) }) diff --git a/packages/pgmq-js/test/lib/50.msg-manager/71.msg.partition.test.ts b/packages/pgmq-js/test/lib/50.msg-manager/71.msg.partition.test.ts new file mode 100644 index 0000000..15c92bb --- /dev/null +++ b/packages/pgmq-js/test/lib/50.msg-manager/71.msg.partition.test.ts @@ -0,0 +1,90 @@ +import assert from 'node:assert' + +import { fileShortPath, sleep } from '@waiting/shared-core' + +import { type CreatePartitionedQueueMetaOptions, type DeleteBatchOptions, type QueueOptionsBase, type ReadBatchOptions, type SendBatchOptions, Pgmq, genRandomName } from '##/index.js' +import { dbConfig } from '#@/config.unittest.js' + + +const rndString = genRandomName(6) +// const rndString = 'abc0ba' + +describe(fileShortPath(import.meta.url), () => { + let mq: Pgmq + let index = 0 + const loop = 10 + const msgItem = { + foo: 'bar', + index, + } + const msgsToSend: (typeof msgItem)[] = [] + const msgs2ToSend: (typeof msgItem)[] = [] + for (let i = 0; i < loop; i++) { + msgsToSend.push({ ...msgItem, index }) + index += 1 + } + for (let i = 0; i < loop; i++) { + msgs2ToSend.push({ ...msgItem, index }) + index += 1 + } + const opts: SendBatchOptions = { + queue: rndString, + msgs: msgsToSend, + } + const opts2: SendBatchOptions = { + queue: rndString, + msgs: msgs2ToSend, + } + const createOpts: CreatePartitionedQueueMetaOptions = { + queue: rndString, + partitionInterval: '1min', + retentionInterval: '2mins', + } + + before(async () => { + mq = new Pgmq('test', dbConfig) + try { + await mq.partition.createPartitioned(createOpts) + } + catch (ex) { + console.log(ex) + } + console.log({ name: rndString }) + }) + after(async () => { + await mq.queue.drop(createOpts) + await mq.destroy() + }) + + it(`msg.sendBatch(${rndString}, msg[])`, async () => { + const msgIds = await mq.msg.sendBatch(opts) + assert(msgIds.length === loop, 'sendBatch failed len:' + msgIds.length) + console.log({ msgIds }) + const msgIds2 = await mq.msg.sendBatch(opts2) + assert(msgIds2.length === loop, 'sendBatch2 failed len:' + msgIds.length) + + const arOpts: DeleteBatchOptions = { + queue: rndString, + msgIds: [...msgIds], + } + const ids = await mq.msg.archiveBatch(arOpts) + console.log({ ids }) + }) + + it(`Validate q_${rndString}_default empty`, async () => { + const tbl = `pgmq.q_${rndString}_default` + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + const resp = await mq.dbh.raw(`SELECT * FROM ${tbl}`) + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + assert(resp.rowCount === 0, 'rowCount not 0') + }) + + it(`Validate a_${rndString}_default empty`, async () => { + const tbl = `pgmq.a_${rndString}_default` + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + const resp = await mq.dbh.raw(`SELECT * FROM ${tbl}`) + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + assert(resp.rowCount === 0, 'rowCount not 0') + }) +}) +