From 81448abd3eb60c317597db12b89b4d3988b27bd3 Mon Sep 17 00:00:00 2001 From: Alexandre Kalendarev Date: Tue, 5 Nov 2024 16:33:12 +0300 Subject: [PATCH 1/2] automate create restory sync point --- src/backend/distributed/shared_library_init.c | 21 +++++++++++++++++++ src/backend/distributed/utils/maintenanced.c | 6 ++++++ 2 files changed, 27 insertions(+) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index bd65fa60c01..e199398b6f5 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -112,6 +112,7 @@ #include "distributed/transaction_recovery.h" #include "distributed/utils/citus_stat_tenants.h" #include "distributed/utils/directory.h" +#include "distributed/utils/restore_interval.h" #include "distributed/worker_log_messages.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" @@ -2678,6 +2679,26 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomStringVariable( + "citus.restore_point_interval", + gettext_noop("Sets the timeout for periodic recovery cluster point"), + NULL, + &RestorePointInterval, + "never", + PGC_SIGHUP, + GUC_STANDARD, + GucCheckInterval, NULL, NULL); + + DefineCustomStringVariable( + "citus.restore_point_interval_name", + gettext_noop("Sets the prefix of pointname for the periodic recovery cluster point"), + NULL, + &RestorePointIntervalName, + NULL, + PGC_SIGHUP, + GUC_STANDARD, + NULL, NULL, NULL); + /* warn about config items in the citus namespace that are not registered above */ EmitWarningsOnPlaceholders("citus"); diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 9cef13539b4..63e3a55d6b9 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -59,6 +59,7 @@ #include "distributed/shard_cleaner.h" #include "distributed/statistics_collection.h" #include "distributed/transaction_recovery.h" +#include "distributed/utils/restore_interval.h" #include "distributed/version_compat.h" /* @@ -955,6 +956,11 @@ CitusMaintenanceDaemonMain(Datum main_arg) */ } + if ( restorePointIntervalMode && IsCoordinator() && !RecoveryInProgress()) + { + CheckRestoreInterval(MyDatabaseId, myDbData->userOid); + } + if (got_SIGHUP) { got_SIGHUP = false; From f014d8fd733986adeb84a98f6747f385bcbc25bb Mon Sep 17 00:00:00 2001 From: Alexandre Kalendarev Date: Tue, 5 Nov 2024 16:34:42 +0300 Subject: [PATCH 2/2] automate create restory sync point new files --- .../distributed/utils/restore_interval.c | 255 ++++++++++++++++++ .../distributed/utils/restore_interval.h | 37 +++ 2 files changed, 292 insertions(+) create mode 100644 src/backend/distributed/utils/restore_interval.c create mode 100644 src/include/distributed/utils/restore_interval.h diff --git a/src/backend/distributed/utils/restore_interval.c b/src/backend/distributed/utils/restore_interval.c new file mode 100644 index 00000000000..067568c9979 --- /dev/null +++ b/src/backend/distributed/utils/restore_interval.c @@ -0,0 +1,255 @@ +/*------------------------------------------------------------------------- + * + * util/restore_interval.c + * Subroutines related to the periodic sync save checkpoint. + * + *------------------------------------------------------------------------- + */ + +#include +#include +#include +#include +#include + +#include "c.h" +#include "postgres.h" +#include "fmgr.h" +#include "libpq-fe.h" + +#include "miscadmin.h" +#include "pgstat.h" + +#include "commands/dbcommands.h" +#include "distributed/listutils.h" +#include "distributed/citus_safe_lib.h" +#include "distributed/connection_management.h" +#include "distributed/coordinator_protocol.h" +#include "distributed/metadata_utility.h" +#include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" +#include "distributed/resource_lock.h" +#include "distributed/worker_manager.h" +#include "distributed/utils/restore_interval.h" +#include "executor/spi.h" +#include "lib/stringinfo.h" +#include "nodes/pg_list.h" +#include "postmaster/bgworker.h" +#include "storage/ipc.h" +#include "utils/backend_status.h" +#include "utils/builtins.h" + + +char *RestorePointInterval = NULL; +char *RestorePointIntervalName = NULL; + +RestorePointIntervalMode restorePointIntervalMode = RESTOREPOINT_INTERVAL_NEVER; + +/* the flags for first time of execute proc */ +static bool is_first_inday = true; +static bool is_first_inhour = true; +static bool got_SIGALRM = false; +static bool got_SIGTERM = false; + + +static void SendRestorePointCmd(Oid DatabaseId, Oid UserOid); + +void send_sql_restorepoint_cmd(Datum main_arg); + +static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS); +static void MetadataSyncSigTermHandler(SIGNAL_ARGS); + + +void +send_sql_restorepoint_cmd(Datum main_arg) +{ + StringInfoData sql; + pg_time_t t = time(NULL); + struct tm * tt = localtime(&t); + Oid databaseOid = DatumGetObjectId(main_arg); + Oid extensionOwner = InvalidOid; + int spiStatus; + + initStringInfo(&sql); + if (RestorePointIntervalName != NULL) + appendStringInfo(&sql,"SELECT citus_create_restore_point('%s_%02d.%02d_%02d:00')", + RestorePointIntervalName, tt->tm_mon, tt->tm_mday, tt->tm_hour); + else + appendStringInfo(&sql,"SELECT citus_create_restore_point('restore_autosave_%02d.%02d_%02d:00')", + tt->tm_mon, tt->tm_mday, tt->tm_hour); + + + /* extension owner is passed via bgw_extra */ + memcpy_s(&extensionOwner, sizeof(extensionOwner), + MyBgworkerEntry->bgw_extra, sizeof(Oid)); + + pqsignal(SIGTERM, MetadataSyncSigTermHandler); + pqsignal(SIGALRM, MetadataSyncSigAlrmHandler); + BackgroundWorkerUnblockSignals(); + + BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner,BGWORKER_BYPASS_ALLOWCONN); + + /* + * Start transaction. + */ + SetCurrentStatementStartTimestamp(); + StartTransactionCommand(); + SPI_connect(); + PushActiveSnapshot(GetTransactionSnapshot()); + pgstat_report_activity(STATE_RUNNING, sql.data); + pgstat_report_appname("restore point"); + + spiStatus = SPI_exec(sql.data, 1); + if (spiStatus != SPI_OK_SELECT) + elog(FATAL, "cannot cancel competing backends for backend %d", getpid()); + + /* + * And finish our transaction. + */ + SPI_finish(); + PopActiveSnapshot(); + CommitTransactionCommand(); + pgstat_report_stat(false); + pgstat_report_activity(STATE_IDLE, NULL); + + proc_exit(0); +} + + +static void +SendRestorePointCmd(Oid DatabaseId, Oid UserOid) +{ + BackgroundWorkerHandle *handle = NULL; + BackgroundWorker worker; + + worker.bgw_flags = BGWORKER_BACKEND_DATABASE_CONNECTION | BGWORKER_SHMEM_ACCESS; + strcpy(worker.bgw_name,"Citus Recovery Background"); + strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); + strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), + "send_sql_restorepoint_cmd"); + strcpy(worker.bgw_type, "pg_restore_interval"); + + worker.bgw_main_arg = ObjectIdGetDatum(DatabaseId); + memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &UserOid, + sizeof(UserOid)); + + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + + BackgroundWorkerUnblockSignals(); + + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + { + elog(LOG, "error of RegisterDynamicBackgroundWorker "); + return; + } +} + + +void CheckRestoreInterval(Oid databaseId, Oid userOid) +{ + pg_time_t t = time(NULL); + struct tm * tt = localtime(&t); + + if (is_first_inday == false) + { + if (tt->tm_hour) + is_first_inday = true; + } + + + if (restorePointIntervalMode == RESTOREPOINT_INTERVAL_DAILY) + { + if (is_first_inday && tt->tm_hour == 0 && tt->tm_min == 0) + { + is_first_inday = false; + SendRestorePointCmd(databaseId, userOid); + return; + } + } + + + if (is_first_inhour == false) + { + if (tt->tm_min) + is_first_inhour = true; + } + + if (restorePointIntervalMode == RESTOREPOINT_INTERVAL_HOURLY) + { + if ( tt->tm_min == 0 && is_first_inhour) + { + is_first_inhour = false; + SendRestorePointCmd(databaseId, userOid); + return; + } + } + + +} + + +bool +GucCheckInterval(char **newval, void **extra, GucSource source) +{ + if (newval == NULL) + return false; + + if (*newval == NULL) + return true; + + if (strcmp(*newval,"daily") == 0) + { + restorePointIntervalMode = RESTOREPOINT_INTERVAL_DAILY; + return true; + } + + if (strcmp(*newval,"hourly") == 0) + { + restorePointIntervalMode = RESTOREPOINT_INTERVAL_HOURLY; + return true; + } + + if (strcmp(*newval,"never") == 0) + return true; + + GUC_check_errdetail("The dataime format is dayly hourly or never"); + return false; +} + + +/* + * MetadataSyncSigAlrmHandler set a flag to request error at metadata + * sync daemon. This is used for testing purposes. + */ +static void +MetadataSyncSigAlrmHandler(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGALRM = true; + if (MyProc != NULL) + { + SetLatch(&MyProc->procLatch); + } + + errno = save_errno; +} + +/* + * MetadataSyncSigTermHandler set a flag to request termination of metadata + * sync daemon. + */ +static void +MetadataSyncSigTermHandler(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGTERM = true; + if (MyProc != NULL) + { + SetLatch(&MyProc->procLatch); + } + + errno = save_errno; +} \ No newline at end of file diff --git a/src/include/distributed/utils/restore_interval.h b/src/include/distributed/utils/restore_interval.h new file mode 100644 index 00000000000..a306cf466b0 --- /dev/null +++ b/src/include/distributed/utils/restore_interval.h @@ -0,0 +1,37 @@ +/*------------------------------------------------------------------------- + * + * distributed/utils/restore_interval.h + * Subroutines related to the save checkpoint by time. + * + *------------------------------------------------------------------------- + */ + +#ifndef RESTORE_POINT_INTERVAL_H +#define RESTORE_POINT_INTERVAL_H + +#include "utils/guc.h" + + +typedef struct citus_tm +{ + int8 ss; + int8 mm; + int8 hh; +} citus_tm; + +typedef enum RestorePointIntervalMode { + RESTOREPOINT_INTERVAL_NEVER = 0, + RESTOREPOINT_INTERVAL_HOURLY, + RESTOREPOINT_INTERVAL_DAILY +} RestorePointIntervalMode; + + +extern char *RestorePointInterval; +extern char *RestorePointIntervalName; +extern RestorePointIntervalMode restorePointIntervalMode; + + +void InitializeRecoveryDaemon(void); +bool GucCheckInterval(char **newval, void **extra, GucSource source); +void CheckRestoreInterval(Oid DatabaseId, Oid UserOid); +#endif /* RESTORE_POINT_INTERVAL_H */ \ No newline at end of file