Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

better logging in _handle_slots #52

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1174,15 +1174,21 @@ def _handle_slots(self):

# create slots
slot_names = [helpers.app_name_from_fqdn(fqdn) for fqdn in slot_lock_holders]
actual_replication_slots = self.db.get_replication_slots()
if actual_replication_slots is None:
logging.warning('Failed to get actual replication slots')
# However, we can continue here and try to create slots. None of slots will be dropped, but some might be created
else:
logging.debug('Actual replication slots: %s', actual_replication_slots)

if not self.db.replication_slots('create', slot_names):
if not self.db.create_replication_slots(slot_names, verbose=False):
logging.warning('Could not create replication slots. %s', slot_names)

# drop slots
if my_hostname in non_holders_hosts:
non_holders_hosts.remove(my_hostname)
slot_names_to_drop = [helpers.app_name_from_fqdn(fqdn) for fqdn in non_holders_hosts]
if not self.db.replication_slots('drop', slot_names_to_drop):
if not self.db.drop_replication_slots(slot_names_to_drop, verbose=False):
logging.warning('Could not drop replication slots. %s', slot_names_to_drop)

def _get_db_state(self):
Expand Down Expand Up @@ -1348,7 +1354,7 @@ def _promote_handle_slots(self):
return False
# Create replication slots, regardless of whether replicas hold DCS locks for replication slots.
hosts = [helpers.app_name_from_fqdn(fqdn) for fqdn in hosts]
if not self.db.replication_slots('create', hosts):
if not self.db.create_replication_slots(hosts):
logging.error('Could not create replication slots. Releasing the lock in ZK.')
return False

Expand Down
36 changes: 20 additions & 16 deletions src/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,17 @@ def _offline_detect_pgdata(self):
logging.error(line.rstrip())

@helpers.return_none_on_error
def _get_replication_slots(self):
def get_replication_slots(self):
res = self._exec_query('SELECT slot_name FROM pg_replication_slots;').fetchall()
return [i[0] for i in res]

def _create_replication_slot(self, slot_name):
logging.info('Creating slot %s.', slot_name)
query = f"SELECT pg_create_physical_replication_slot('{slot_name}', true)"
return self._exec_without_result(query)

def _drop_replication_slot(self, slot_name):
logging.info('Dropping slot %s.', slot_name)
query = f"SELECT pg_drop_replication_slot('{slot_name}')"
return self._exec_without_result(query)

Expand Down Expand Up @@ -736,24 +738,26 @@ def stop_postgresql(self, timeout=60):
logging.warning(line.rstrip())
return self._cmd_manager.stop_postgresql(timeout, self.pgdata)

def replication_slots(self, action, slots):
"""
Perform replication slots action (create/drop)
"""
current = self._get_replication_slots()
def create_replication_slots(self, slots, verbose=True):
current = self.get_replication_slots()
for slot in slots:
if action == 'create':
if current and slot in current:
if current and slot in current:
if verbose:
logging.debug('Slot %s already exists.', slot)
continue
if not self._create_replication_slot(slot):
return False
else:
if current is not None and slot not in current:
continue
if not self._create_replication_slot(slot):
return False
return True

def drop_replication_slots(self, slots, verbose=True):
current = self.get_replication_slots()
for slot in slots:
if current is not None and slot not in current:
if verbose:
logging.debug('Slot %s does not exist.', slot)
continue
if not self._drop_replication_slot(slot):
return False
continue
if not self._drop_replication_slot(slot):
return False
return True

def is_replaying_wal(self, check_time):
Expand Down