Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better managing of replication slots #30

Merged
merged 6 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ def read_config(filename=None, options=None):
'certfile': None,
'ca_cert': None,
'verify_certs': 'no',
'drop_slot_countdown': 10,
'replication_slots_polling': None,
},
'primary': {
'change_replication_type': 'yes',
Expand Down Expand Up @@ -131,6 +133,8 @@ def read_config(filename=None, options=None):
for key, value in defaults[section].items():
if not config.has_option(section, key):
config.set(section, key, value)
if config.get('global', 'replication_slots_polling') is None:
config.set('global', 'replication_slots_polling', config.get('global', 'use_replication_slots'))

#
# Rewriting global config with parameters from command line.
Expand Down
115 changes: 88 additions & 27 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(self, **kwargs):
self.checks = {'primary_switch': 0, 'failover': 0, 'rewind': 0}
self._is_single_node = False
self.notifier = sdnotify.Notifier()
self._slot_drop_countdown = {}

if self.config.getboolean('global', 'quorum_commit'):
self._replication_manager = QuorumReplicationManager(
Expand Down Expand Up @@ -199,6 +200,11 @@ def start(self):
"""
Start iterations
"""
if (not self.config.getboolean('global', 'use_replication_slots') and
self.config.getboolean('global', 'replication_slots_polling')):
logging.warning('Force disable replication_slots_polling because use_replication_slots is disabled.')
self.config.set('global', 'replication_slots_polling', 'no')

my_prio = self.config.get('global', 'priority')
self.notifier.ready()
while True:
Expand Down Expand Up @@ -371,7 +377,7 @@ def primary_iter(self, db_state, zk_state):
logging.warning('Could not acquire lock due to destructive operation fail: %s', last_op)
return self.release_lock_and_return_to_cluster()
if stream_from:
logging.warning('Host not in HA group We should return to stream_from.')
logging.warning('Host not in HA group. We should return to stream_from.')
return self.release_lock_and_return_to_cluster()

current_promoting_host = zk_state.get(self.zk.CURRENT_PROMOTING_HOST)
Expand All @@ -397,6 +403,8 @@ def primary_iter(self, db_state, zk_state):

self.checks['primary_switch'] = 0

self._handle_slots()

self._store_replics_info(db_state, zk_state)

# Make sure local timeline corresponds to that of the cluster.
Expand Down Expand Up @@ -634,7 +642,13 @@ def non_ha_replica_iter(self, db_state, zk_state):
db_state['wal_receiver'],
replics_info,
)
if not streaming and not can_delayed:
current_primary = zk_state['lock_holder']

if streaming_from_primary and not streaming:
self._acquire_replication_source_slot_lock(current_primary)
if streaming:
munakoiso marked this conversation as resolved.
Show resolved Hide resolved
self._acquire_replication_source_slot_lock(stream_from)
elif not can_delayed:
logging.warning('Seems that we are not really streaming WAL from %s.', stream_from)
self._replication_manager.leave_sync_group()
replication_source_is_dead = self._check_host_is_really_dead(primary=stream_from)
Expand All @@ -646,7 +660,7 @@ def non_ha_replica_iter(self, db_state, zk_state):
wal_receiver_info and wal_receiver_info[0].get('status') == 'streaming'
)
logging.error(replication_source_replica_info)
current_primary = zk_state['lock_holder']

if replication_source_is_dead:
# Replication source is dead. We need to streaming from primary while it became alive and start streaming from primary.
if stream_from == current_primary or current_primary is None:
Expand Down Expand Up @@ -689,6 +703,7 @@ def non_ha_replica_iter(self, db_state, zk_state):
self.checks['primary_switch'] = 0
self.start_pooler()
self._reset_simple_primary_switch_try()
self._handle_slots()
except Exception:
for line in traceback.format_exc().split('\n'):
logging.error(line.rstrip())
Expand Down Expand Up @@ -776,6 +791,7 @@ def replica_iter(self, db_state, zk_state):
if holder != db_state['primary_fqdn'] and holder != my_hostname:
self._replication_manager.leave_sync_group()
return self.change_primary(db_state, holder)
self._acquire_replication_source_slot_lock(holder)

self.db.ensure_replaying_wal()

Expand All @@ -791,6 +807,7 @@ def replica_iter(self, db_state, zk_state):
self._reset_simple_primary_switch_try()

self._replication_manager.enter_sync_group(replica_infos=replics_info)
self._handle_slots()
except Exception:
for line in traceback.format_exc().split('\n'):
logging.error(line.rstrip())
Expand Down Expand Up @@ -1021,7 +1038,6 @@ def _simple_primary_switch(self, limit, new_primary, is_dead):
# The easy way succeeded.
#
logging.info('Simple primary switch succeeded.')
self._primary_switch_handle_slots()
return True
else:
return False
Expand Down Expand Up @@ -1073,8 +1089,6 @@ def _attach_to_primary(self, new_primary, limit):
self.checks['primary_switch'] = 0
return None

self._primary_switch_handle_slots()

if not self._wait_for_streaming(limit):
self.checks['primary_switch'] = 0
return None
Expand All @@ -1083,24 +1097,50 @@ def _attach_to_primary(self, new_primary, limit):
self.db.checkpoint()
return True

def _primary_switch_handle_slots(self):
need_slots = self.config.getboolean('global', 'use_replication_slots')
if need_slots:
my_hostname = helpers.get_hostname()
hosts = self.zk.get_children(self.zk.MEMBERS_PATH)
if hosts:
if my_hostname in hosts:
hosts.remove(my_hostname)
hosts = [i.replace('.', '_').replace('-', '_') for i in hosts]
logging.debug(hosts)
if not self.db.replication_slots('drop', hosts):
logging.warning('Could not drop replication slots. Do not forget to do it manually!')
def _handle_slots(self):
if not self.config.getboolean('global', 'replication_slots_polling'):
vicpopov marked this conversation as resolved.
Show resolved Hide resolved
return

my_hostname = helpers.get_hostname()
try:
slot_lock_holders = set(self.zk.get_lock_contenders(os.path.join(self.zk.HOST_REPLICATION_SOURCES, my_hostname), read_lock=True, catch_except=False))
except Exception as e:
logging.warning(
'Could not get slot lock holders. %s'
'Can not handle replication slots. We will skip it this time', e
)
return
all_hosts = self.zk.get_children(self.zk.MEMBERS_PATH)
if not all_hosts:
logging.warning(
'Could not get all hosts list from ZK.'
'Can not handle replication slots. We will skip it this time'
)
return
non_holders_hosts = []

for host in all_hosts:
if host in slot_lock_holders:
self._slot_drop_countdown[host] = self.config.getint('global', 'drop_slot_countdown')
else:
logging.warning(
'Could not get all hosts list from ZK. '
'Replication slots should be dropped but we '
'are unable to do it. Skipping it.'
)
if host not in self._slot_drop_countdown:
self._slot_drop_countdown[host] = self.config.getint('global', 'drop_slot_countdown')
self._slot_drop_countdown[host] -= 1
if self._slot_drop_countdown[host] < 0:
non_holders_hosts.append(host)

# create slots
slot_names = [helpers.app_name_from_fqdn(fqdn) for fqdn in slot_lock_holders]

if not self.db.replication_slots('create', slot_names):
logging.warning('Could not create replication slots. %s', slot_names)

# drop slots
if my_hostname in non_holders_hosts:
non_holders_hosts.remove(my_hostname)
slot_names_to_drop = [helpers.app_name_from_fqdn(fqdn) for fqdn in non_holders_hosts]
if not self.db.replication_slots('drop', slot_names_to_drop):
logging.warning('Could not drop replication slots. %s', slot_names_to_drop)

def _get_db_state(self):
state = self.db.get_data_from_control_file('Database cluster state')
Expand All @@ -1110,6 +1150,25 @@ def _get_db_state(self):
logging.info('Database cluster state is: %s' % state)
return state

def _acquire_replication_source_slot_lock(self, source):
if not self.config.getboolean('global', 'replication_slots_polling'):
return
# We need to drop the slot in the old primary.
# But we don't know who the primary was (probably there are many of them).
# So, we need to release the lock on all hosts.
replication_sources = self.zk.get_children(self.zk.HOST_REPLICATION_SOURCES)
if replication_sources:
for host in replication_sources:
if source != host:
self.zk.release_if_hold(os.path.join(self.zk.HOST_REPLICATION_SOURCES, host), read_lock=True)
else:
logging.warning(
'Could not get all hosts list from ZK.'
'Can not release old replication slot locks. We will skip it this time'
)
# And acquire lock (then new_primary will create replication slot)
self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, source), read_lock=True)

def _return_to_cluster(self, new_primary, role, is_dead=False):
"""
Return to cluster (try stupid method, if it fails we try rewind)
Expand All @@ -1121,6 +1180,7 @@ def _return_to_cluster(self, new_primary, role, is_dead=False):
self.checks['primary_switch'] = 1
logging.debug("primary_switch checks is %d", self.checks['primary_switch'])

self._acquire_replication_source_slot_lock(new_primary)
failover_state = self.zk.noexcept_get(self.zk.FAILOVER_INFO_PATH)
if failover_state is not None and failover_state not in ('finished', 'promoting', 'checkpointing'):
logging.info(
Expand Down Expand Up @@ -1207,6 +1267,8 @@ def _promote(self):

logging.info('Promote command failed but we are current primary. Continue')

self._slot_drop_countdown = {}

if not self.zk.noexcept_write(self.zk.FAILOVER_INFO_PATH, 'checkpointing'):
logging.warning('Could not write failover state to ZK.')

Expand All @@ -1228,8 +1290,7 @@ def _promote(self):
return True

def _promote_handle_slots(self):
need_slots = self.config.getboolean('global', 'use_replication_slots')
if need_slots:
if self.config.getboolean('global', 'use_replication_slots'):
if not self.zk.write(self.zk.FAILOVER_INFO_PATH, 'creating_slots'):
logging.warning('Could not write failover state to ZK.')

Expand All @@ -1241,8 +1302,8 @@ def _promote_handle_slots(self):
'are unable to do it. Releasing the lock.'
)
return False

hosts = [i.replace('.', '_').replace('-', '_') for i in hosts]
# Create replication slots, regardless of whether replicas hold DCS locks for replication slots.
hosts = [helpers.app_name_from_fqdn(fqdn) for fqdn in hosts]
if not self.db.replication_slots('create', hosts):
logging.error('Could not create replication slots. Releasing the lock in ZK.')
return False
Expand Down
2 changes: 1 addition & 1 deletion src/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ def replication_slots(self, action, slots):
if not self._create_replication_slot(slot):
return False
else:
if current and slot not in current:
if current is not None and slot not in current:
vicpopov marked this conversation as resolved.
Show resolved Hide resolved
logging.warning('Slot %s does not exist.', slot)
continue
if not self._drop_replication_slot(slot):
Expand Down
40 changes: 25 additions & 15 deletions src/zk.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class Zookeeper(object):
MAINTENANCE_PRIMARY_PATH = f'{MAINTENANCE_PATH}/master'
HOST_MAINTENANCE_PATH = f'{MAINTENANCE_PATH}/%s'
HOST_ALIVE_LOCK_PATH = 'alive/%s'
HOST_REPLICATION_SOURCES = 'replication_sources'

SINGLE_NODE_PATH = 'is_single_node'

Expand Down Expand Up @@ -188,11 +189,15 @@ def _write(self, path, data, need_lock=True):
logging.error(event.exception)
return not event.exception

def _init_lock(self, name):
def _init_lock(self, name, read_lock=False):
path = self._path_prefix + name
self._locks[name] = self._zk.Lock(path, helpers.get_hostname())
if read_lock:
lock = self._zk.ReadLock(path, helpers.get_hostname())
vicpopov marked this conversation as resolved.
Show resolved Hide resolved
else:
lock = self._zk.Lock(path, helpers.get_hostname())
self._locks[name] = lock

def _acquire_lock(self, name, allow_queue, timeout):
def _acquire_lock(self, name, allow_queue, timeout, read_lock=False):
if timeout is None:
timeout = self._timeout
if self._zk.state != KazooState.CONNECTED:
Expand All @@ -202,14 +207,16 @@ def _acquire_lock(self, name, allow_queue, timeout):
lock = self._locks[name]
else:
logging.debug('No lock instance for %s. Creating one.', name)
self._init_lock(name)
self._init_lock(name, read_lock=read_lock)
lock = self._locks[name]
contenders = lock.contenders()
if len(contenders) != 0:
if contenders[0] == helpers.get_hostname():
if not read_lock:
contenders = contenders[:1]
if helpers.get_hostname() in contenders:
logging.debug('We already hold the %s lock.', name)
return True
if not allow_queue:
if not (allow_queue or read_lock):
logging.warning('%s lock is already taken by %s.', name[0].upper() + name[1:], contenders[0])
return False
try:
Expand Down Expand Up @@ -442,14 +449,14 @@ def get_current_lock_version(self):
return min([i.split('__')[-1] for i in children])
return None

def get_lock_contenders(self, name, catch_except=True):
def get_lock_contenders(self, name, catch_except=True, read_lock=False):
"""
Get a list of all hostnames that are competing for the lock,
including the holder.
"""
try:
if name not in self._locks:
self._init_lock(name)
self._init_lock(name, read_lock=read_lock)
contenders = self._locks[name].contenders()
if len(contenders) > 0:
return contenders
Expand All @@ -471,18 +478,18 @@ def get_current_lock_holder(self, name=None, catch_except=True):
else:
return None

def acquire_lock(self, lock_type, allow_queue=False, timeout=None):
result = self._acquire_lock(lock_type, allow_queue, timeout)
def acquire_lock(self, lock_type, allow_queue=False, timeout=None, read_lock=False):
result = self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock)
if not result:
raise ZookeeperException(f'Failed to acquire lock {lock_type}')
logging.debug(f'Success acquire lock: {lock_type}')

def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None):
def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None, read_lock=False):
"""
Acquire lock (leader by default)
"""
lock_type = lock_type or self.PRIMARY_LOCK_PATH
return self._acquire_lock(lock_type, allow_queue, timeout)
return self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock)

def release_lock(self, lock_type=None, wait=0):
"""
Expand All @@ -509,9 +516,12 @@ def release_lock(self, lock_type=None, wait=0):
time.sleep(1)
raise RuntimeError('unable to release lock after %i attempts' % wait)

def release_if_hold(self, lock_type, wait=0):
holder = self.get_current_lock_holder(lock_type)
if holder != helpers.get_hostname():
def release_if_hold(self, lock_type, wait=0, read_lock=False):
if read_lock:
holders = self.get_lock_contenders(lock_type, read_lock=read_lock)
else:
holders = [self.get_current_lock_holder(lock_type)]
if helpers.get_hostname() not in holders:
return True
return self.release_lock(lock_type, wait)

Expand Down
Loading