Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automate create sync restore point #7723

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
#include "distributed/transaction_recovery.h"
#include "distributed/utils/citus_stat_tenants.h"
#include "distributed/utils/directory.h"
#include "distributed/utils/restore_interval.h"
#include "distributed/worker_log_messages.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
Expand Down Expand Up @@ -2678,6 +2679,26 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomStringVariable(
"citus.restore_point_interval",
gettext_noop("Sets the timeout for periodic recovery cluster point"),
NULL,
&RestorePointInterval,
"never",
PGC_SIGHUP,
GUC_STANDARD,
GucCheckInterval, NULL, NULL);

DefineCustomStringVariable(
"citus.restore_point_interval_name",
gettext_noop("Sets the prefix of pointname for the periodic recovery cluster point"),
NULL,
&RestorePointIntervalName,
NULL,
PGC_SIGHUP,
GUC_STANDARD,
NULL, NULL, NULL);

/* warn about config items in the citus namespace that are not registered above */
EmitWarningsOnPlaceholders("citus");

Expand Down
6 changes: 6 additions & 0 deletions src/backend/distributed/utils/maintenanced.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "distributed/shard_cleaner.h"
#include "distributed/statistics_collection.h"
#include "distributed/transaction_recovery.h"
#include "distributed/utils/restore_interval.h"
#include "distributed/version_compat.h"

/*
Expand Down Expand Up @@ -955,6 +956,11 @@ CitusMaintenanceDaemonMain(Datum main_arg)
*/
}

if ( restorePointIntervalMode && IsCoordinator() && !RecoveryInProgress())
{
CheckRestoreInterval(MyDatabaseId, myDbData->userOid);
}

if (got_SIGHUP)
{
got_SIGHUP = false;
Expand Down
255 changes: 255 additions & 0 deletions src/backend/distributed/utils/restore_interval.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*-------------------------------------------------------------------------
*
* util/restore_interval.c
* Subroutines related to the periodic sync save checkpoint.
*
*-------------------------------------------------------------------------
*/

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

#include "c.h"
#include "postgres.h"
#include "fmgr.h"
#include "libpq-fe.h"

#include "miscadmin.h"
#include "pgstat.h"

#include "commands/dbcommands.h"
#include "distributed/listutils.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/connection_management.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/worker_manager.h"
#include "distributed/utils/restore_interval.h"
#include "executor/spi.h"
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "utils/backend_status.h"
#include "utils/builtins.h"


char *RestorePointInterval = NULL;
char *RestorePointIntervalName = NULL;

RestorePointIntervalMode restorePointIntervalMode = RESTOREPOINT_INTERVAL_NEVER;

/* the flags for first time of execute proc */
static bool is_first_inday = true;
static bool is_first_inhour = true;
static bool got_SIGALRM = false;
static bool got_SIGTERM = false;


static void SendRestorePointCmd(Oid DatabaseId, Oid UserOid);

void send_sql_restorepoint_cmd(Datum main_arg);

static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS);
static void MetadataSyncSigTermHandler(SIGNAL_ARGS);


void
send_sql_restorepoint_cmd(Datum main_arg)
{
StringInfoData sql;
pg_time_t t = time(NULL);
struct tm * tt = localtime(&t);
Oid databaseOid = DatumGetObjectId(main_arg);
Oid extensionOwner = InvalidOid;
int spiStatus;

initStringInfo(&sql);
if (RestorePointIntervalName != NULL)
appendStringInfo(&sql,"SELECT citus_create_restore_point('%s_%02d.%02d_%02d:00')",
RestorePointIntervalName, tt->tm_mon, tt->tm_mday, tt->tm_hour);
else
appendStringInfo(&sql,"SELECT citus_create_restore_point('restore_autosave_%02d.%02d_%02d:00')",
tt->tm_mon, tt->tm_mday, tt->tm_hour);


/* extension owner is passed via bgw_extra */
memcpy_s(&extensionOwner, sizeof(extensionOwner),
MyBgworkerEntry->bgw_extra, sizeof(Oid));

pqsignal(SIGTERM, MetadataSyncSigTermHandler);
pqsignal(SIGALRM, MetadataSyncSigAlrmHandler);
BackgroundWorkerUnblockSignals();

BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner,BGWORKER_BYPASS_ALLOWCONN);

/*
* Start transaction.
*/
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
pgstat_report_activity(STATE_RUNNING, sql.data);
pgstat_report_appname("restore point");

spiStatus = SPI_exec(sql.data, 1);
if (spiStatus != SPI_OK_SELECT)
elog(FATAL, "cannot cancel competing backends for backend %d", getpid());

/*
* And finish our transaction.
*/
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
pgstat_report_stat(false);
pgstat_report_activity(STATE_IDLE, NULL);

proc_exit(0);
}


static void
SendRestorePointCmd(Oid DatabaseId, Oid UserOid)
{
BackgroundWorkerHandle *handle = NULL;
BackgroundWorker worker;

worker.bgw_flags = BGWORKER_BACKEND_DATABASE_CONNECTION | BGWORKER_SHMEM_ACCESS;
strcpy(worker.bgw_name,"Citus Recovery Background");
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
"send_sql_restorepoint_cmd");
strcpy(worker.bgw_type, "pg_restore_interval");

worker.bgw_main_arg = ObjectIdGetDatum(DatabaseId);
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &UserOid,
sizeof(UserOid));

worker.bgw_start_time = BgWorkerStart_ConsistentState;
worker.bgw_restart_time = BGW_NEVER_RESTART;

BackgroundWorkerUnblockSignals();

if (!RegisterDynamicBackgroundWorker(&worker, &handle))
{
elog(LOG, "error of RegisterDynamicBackgroundWorker ");
return;
}
}


void CheckRestoreInterval(Oid databaseId, Oid userOid)
{
pg_time_t t = time(NULL);
struct tm * tt = localtime(&t);

if (is_first_inday == false)
{
if (tt->tm_hour)
is_first_inday = true;
}


if (restorePointIntervalMode == RESTOREPOINT_INTERVAL_DAILY)
{
if (is_first_inday && tt->tm_hour == 0 && tt->tm_min == 0)
{
is_first_inday = false;
SendRestorePointCmd(databaseId, userOid);
return;
}
}


if (is_first_inhour == false)
{
if (tt->tm_min)
is_first_inhour = true;
}

if (restorePointIntervalMode == RESTOREPOINT_INTERVAL_HOURLY)
{
if ( tt->tm_min == 0 && is_first_inhour)
{
is_first_inhour = false;
SendRestorePointCmd(databaseId, userOid);
return;
}
}


}


bool
GucCheckInterval(char **newval, void **extra, GucSource source)
{
if (newval == NULL)
return false;

if (*newval == NULL)
return true;

if (strcmp(*newval,"daily") == 0)
{
restorePointIntervalMode = RESTOREPOINT_INTERVAL_DAILY;
return true;
}

if (strcmp(*newval,"hourly") == 0)
{
restorePointIntervalMode = RESTOREPOINT_INTERVAL_HOURLY;
return true;
}

if (strcmp(*newval,"never") == 0)
return true;

GUC_check_errdetail("The dataime format is dayly hourly or never");
return false;
}


/*
* MetadataSyncSigAlrmHandler set a flag to request error at metadata
* sync daemon. This is used for testing purposes.
*/
static void
MetadataSyncSigAlrmHandler(SIGNAL_ARGS)
{
int save_errno = errno;

got_SIGALRM = true;
if (MyProc != NULL)
{
SetLatch(&MyProc->procLatch);
}

errno = save_errno;
}

/*
* MetadataSyncSigTermHandler set a flag to request termination of metadata
* sync daemon.
*/
static void
MetadataSyncSigTermHandler(SIGNAL_ARGS)
{
int save_errno = errno;

got_SIGTERM = true;
if (MyProc != NULL)
{
SetLatch(&MyProc->procLatch);
}

errno = save_errno;
}
37 changes: 37 additions & 0 deletions src/include/distributed/utils/restore_interval.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*-------------------------------------------------------------------------
*
* distributed/utils/restore_interval.h
* Subroutines related to the save checkpoint by time.
*
*-------------------------------------------------------------------------
*/

#ifndef RESTORE_POINT_INTERVAL_H
#define RESTORE_POINT_INTERVAL_H

#include "utils/guc.h"


typedef struct citus_tm
{
int8 ss;
int8 mm;
int8 hh;
} citus_tm;

typedef enum RestorePointIntervalMode {
RESTOREPOINT_INTERVAL_NEVER = 0,
RESTOREPOINT_INTERVAL_HOURLY,
RESTOREPOINT_INTERVAL_DAILY
} RestorePointIntervalMode;


extern char *RestorePointInterval;
extern char *RestorePointIntervalName;
extern RestorePointIntervalMode restorePointIntervalMode;


void InitializeRecoveryDaemon(void);
bool GucCheckInterval(char **newval, void **extra, GucSource source);
void CheckRestoreInterval(Oid DatabaseId, Oid UserOid);
#endif /* RESTORE_POINT_INTERVAL_H */