Skip to content

Commit

Permalink
Better managing of replication slots
Browse files Browse the repository at this point in the history
  • Loading branch information
munakoiso committed Jul 23, 2024
1 parent 71f48d1 commit 88448a9
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 47 deletions.
1 change: 1 addition & 0 deletions src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def read_config(filename=None, options=None):
'certfile': None,
'ca_cert': None,
'verify_certs': 'no',
'drop_slot_countdown': 10,
},
'primary': {
'change_replication_type': 'yes',
Expand Down
103 changes: 79 additions & 24 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(self, **kwargs):
self.checks = {'primary_switch': 0, 'failover': 0, 'rewind': 0}
self._is_single_node = False
self.notifier = sdnotify.Notifier()
self._slot_drop_countdown = {}

if self.config.getboolean('global', 'quorum_commit'):
self._replication_manager = QuorumReplicationManager(
Expand Down Expand Up @@ -304,6 +305,7 @@ def run_iteration(self, my_prio):
self.non_ha_replica_iter(db_state, zk_state)
else:
self.replica_iter(db_state, zk_state)
self._handle_slots()
self.re_init_db()
self.re_init_zk()

Expand Down Expand Up @@ -371,7 +373,7 @@ def primary_iter(self, db_state, zk_state):
logging.warning('Could not acquire lock due to destructive operation fail: %s', last_op)
return self.release_lock_and_return_to_cluster()
if stream_from:
logging.warning('Host not in HA group We should return to stream_from.')
logging.warning('Host not in HA group. We should return to stream_from.')
return self.release_lock_and_return_to_cluster()

current_promoting_host = zk_state.get(self.zk.CURRENT_PROMOTING_HOST)
Expand Down Expand Up @@ -634,7 +636,11 @@ def non_ha_replica_iter(self, db_state, zk_state):
db_state['wal_receiver'],
replics_info,
)
if not streaming and not can_delayed:
current_primary = zk_state['lock_holder']

if streaming:
self._acquire_replication_source_lock(stream_from)
elif not can_delayed:
logging.warning('Seems that we are not really streaming WAL from %s.', stream_from)
self._replication_manager.leave_sync_group()
replication_source_is_dead = self._check_host_is_really_dead(primary=stream_from)
Expand All @@ -646,8 +652,9 @@ def non_ha_replica_iter(self, db_state, zk_state):
wal_receiver_info and wal_receiver_info[0].get('status') == 'streaming'
)
logging.error(replication_source_replica_info)
current_primary = zk_state['lock_holder']

if replication_source_is_dead:
self._acquire_replication_source_lock(current_primary)
# Replication source is dead. We need to streaming from primary while it became alive and start streaming from primary.
if stream_from == current_primary or current_primary is None:
logging.warning(
Expand All @@ -668,6 +675,7 @@ def non_ha_replica_iter(self, db_state, zk_state):
current_primary,
)
else:
self._acquire_replication_source_lock(stream_from)
# Replication source is alive. We need to wait while it starts streaming from primary and start streaming from it.
if replication_source_streams:
logging.warning(
Expand Down Expand Up @@ -776,6 +784,7 @@ def replica_iter(self, db_state, zk_state):
if holder != db_state['primary_fqdn'] and holder != my_hostname:
self._replication_manager.leave_sync_group()
return self.change_primary(db_state, holder)
self._acquire_replication_source_lock(holder)

self.db.ensure_replaying_wal()

Expand Down Expand Up @@ -1021,7 +1030,6 @@ def _simple_primary_switch(self, limit, new_primary, is_dead):
# The easy way succeeded.
#
logging.info('Simple primary switch succeeded.')
self._primary_switch_handle_slots()
return True
else:
return False
Expand Down Expand Up @@ -1073,8 +1081,6 @@ def _attach_to_primary(self, new_primary, limit):
self.checks['primary_switch'] = 0
return None

self._primary_switch_handle_slots()

if not self._wait_for_streaming(limit):
self.checks['primary_switch'] = 0
return None
Expand All @@ -1083,24 +1089,50 @@ def _attach_to_primary(self, new_primary, limit):
self.db.checkpoint()
return True

def _primary_switch_handle_slots(self):
def _handle_slots(self):
need_slots = self.config.getboolean('global', 'use_replication_slots')
if need_slots:
my_hostname = helpers.get_hostname()
hosts = self.zk.get_children(self.zk.MEMBERS_PATH)
if hosts:
if my_hostname in hosts:
hosts.remove(my_hostname)
hosts = [i.replace('.', '_').replace('-', '_') for i in hosts]
logging.debug(hosts)
if not self.db.replication_slots('drop', hosts):
logging.warning('Could not drop replication slots. Do not forget to do it manually!')
else:
logging.warning(
'Could not get all hosts list from ZK. '
'Replication slots should be dropped but we '
'are unable to do it. Skipping it.'
)
if not need_slots:
return

my_hostname = helpers.get_hostname()
try:
slot_lock_holders = self.zk.get_lock_contenders(os.path.join(self.zk.HOST_REPLICATION_SOURCES, my_hostname), read_lock=True, catch_except=False)
except Exception as e:
logging.warning(
'Could not get slot lock holders. %s'
'Can not handle replication slots. We will skip it this time', e
)
return
all_hosts = self.zk.get_children(self.zk.MEMBERS_PATH)
if not all_hosts:
logging.warning(
'Could not get all hosts list from ZK.'
'Can not handle replication slots. We will skip it this time'
)
return
hosts_except_lock_holders = list(set(all_hosts) - set(slot_lock_holders))
non_holders_hosts = []
for host in hosts_except_lock_holders:
if host not in self._slot_drop_countdown:
self._slot_drop_countdown[host] = self.config.getfloat('global', 'drop_slot_countdown')
self._slot_drop_countdown[host] -= 1
if self._slot_drop_countdown[host] <= 0:
non_holders_hosts.append(host)
for host in slot_lock_holders:
self._slot_drop_countdown[host] = self.config.getfloat('global', 'drop_slot_countdown')

# create slots
slot_names = [i.replace('.', '_').replace('-', '_') for i in slot_lock_holders]

if not self.db.replication_slots('create', slot_names):
logging.warning('Could not create replication slots. %s', slot_names)

# drop slots
if my_hostname in non_holders_hosts:
non_holders_hosts.remove(my_hostname)
slot_names_to_drop = [i.replace('.', '_').replace('-', '_') for i in non_holders_hosts]
if not self.db.replication_slots('drop', slot_names_to_drop):
logging.warning('Could not drop replication slots. %s', slot_names_to_drop)

def _get_db_state(self):
state = self.db.get_data_from_control_file('Database cluster state')
Expand All @@ -1110,6 +1142,26 @@ def _get_db_state(self):
logging.info('Database cluster state is: %s' % state)
return state

def _acquire_replication_source_lock(self, primary):
need_slots = self.config.getboolean('global', 'use_replication_slots')
if not need_slots:
return
# We need to drop the slot in the old primary.
# But we don't know who the primary was (probably there are many of them).
# So, we need to release the lock on all hosts.
all_hosts = self.zk.get_children(self.zk.MEMBERS_PATH)
if all_hosts:
for host in all_hosts:
if primary != host:
self.zk.release_if_hold(os.path.join(self.zk.HOST_REPLICATION_SOURCES, host), read_lock=True)
else:
logging.warning(
'Could not get all hosts list from ZK.'
'Can not release old replication slot locks. We will skip it this time'
)
# And acquire lock (then new_primary will create replication slot)
self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, primary), read_lock=True)

def _return_to_cluster(self, new_primary, role, is_dead=False):
"""
Return to cluster (try stupid method, if it fails we try rewind)
Expand All @@ -1121,6 +1173,7 @@ def _return_to_cluster(self, new_primary, role, is_dead=False):
self.checks['primary_switch'] = 1
logging.debug("primary_switch checks is %d", self.checks['primary_switch'])

self._acquire_replication_source_lock(new_primary)
failover_state = self.zk.noexcept_get(self.zk.FAILOVER_INFO_PATH)
if failover_state is not None and failover_state not in ('finished', 'promoting', 'checkpointing'):
logging.info(
Expand Down Expand Up @@ -1207,6 +1260,8 @@ def _promote(self):

logging.info('Promote command failed but we are current primary. Continue')

self._slot_drop_countdown = {}

if not self.zk.noexcept_write(self.zk.FAILOVER_INFO_PATH, 'checkpointing'):
logging.warning('Could not write failover state to ZK.')

Expand Down Expand Up @@ -1241,7 +1296,7 @@ def _promote_handle_slots(self):
'are unable to do it. Releasing the lock.'
)
return False

# Create replication slots, regardless of whether replicas hold DCS locks for replication slots.
hosts = [i.replace('.', '_').replace('-', '_') for i in hosts]
if not self.db.replication_slots('create', hosts):
logging.error('Could not create replication slots. Releasing the lock in ZK.')
Expand Down
2 changes: 1 addition & 1 deletion src/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ def replication_slots(self, action, slots):
if not self._create_replication_slot(slot):
return False
else:
if current and slot not in current:
if current is not None and slot not in current:
logging.warning('Slot %s does not exist.', slot)
continue
if not self._drop_replication_slot(slot):
Expand Down
41 changes: 27 additions & 14 deletions src/zk.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class Zookeeper(object):
MAINTENANCE_PRIMARY_PATH = f'{MAINTENANCE_PATH}/master'
HOST_MAINTENANCE_PATH = f'{MAINTENANCE_PATH}/%s'
HOST_ALIVE_LOCK_PATH = 'alive/%s'
HOST_REPLICATION_SOURCES = 'replication_sources'

SINGLE_NODE_PATH = 'is_single_node'

Expand Down Expand Up @@ -188,11 +189,17 @@ def _write(self, path, data, need_lock=True):
logging.error(event.exception)
return not event.exception

def _init_lock(self, name):
def _init_lock(self, name, read_lock=False):
path = self._path_prefix + name
self._locks[name] = self._zk.Lock(path, helpers.get_hostname())
if read_lock:
lock = self._zk.ReadLock(path, helpers.get_hostname())
else:
lock = self._zk.Lock(path, helpers.get_hostname())
self._locks[name] = lock

def _acquire_lock(self, name, allow_queue, timeout):
def _acquire_lock(self, name, allow_queue, timeout, read_lock=False):
if read_lock:
allow_queue = True
if timeout is None:
timeout = self._timeout
if self._zk.state != KazooState.CONNECTED:
Expand All @@ -202,11 +209,13 @@ def _acquire_lock(self, name, allow_queue, timeout):
lock = self._locks[name]
else:
logging.debug('No lock instance for %s. Creating one.', name)
self._init_lock(name)
self._init_lock(name, read_lock=read_lock)
lock = self._locks[name]
contenders = lock.contenders()
if len(contenders) != 0:
if contenders[0] == helpers.get_hostname():
if not read_lock:
contenders = contenders[:1]
if helpers.get_hostname() in contenders:
logging.debug('We already hold the %s lock.', name)
return True
if not allow_queue:
Expand Down Expand Up @@ -442,14 +451,14 @@ def get_current_lock_version(self):
return min([i.split('__')[-1] for i in children])
return None

def get_lock_contenders(self, name, catch_except=True):
def get_lock_contenders(self, name, catch_except=True, read_lock=False):
"""
Get a list of all hostnames that are competing for the lock,
including the holder.
"""
try:
if name not in self._locks:
self._init_lock(name)
self._init_lock(name, read_lock=read_lock)
contenders = self._locks[name].contenders()
if len(contenders) > 0:
return contenders
Expand All @@ -471,18 +480,18 @@ def get_current_lock_holder(self, name=None, catch_except=True):
else:
return None

def acquire_lock(self, lock_type, allow_queue=False, timeout=None):
result = self._acquire_lock(lock_type, allow_queue, timeout)
def acquire_lock(self, lock_type, allow_queue=False, timeout=None, read_lock=False):
result = self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock)
if not result:
raise ZookeeperException(f'Failed to acquire lock {lock_type}')
logging.debug(f'Success acquire lock: {lock_type}')

def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None):
def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None, read_lock=False):
"""
Acquire lock (leader by default)
"""
lock_type = lock_type or self.PRIMARY_LOCK_PATH
return self._acquire_lock(lock_type, allow_queue, timeout)
return self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock)

def release_lock(self, lock_type=None, wait=0):
"""
Expand All @@ -509,9 +518,13 @@ def release_lock(self, lock_type=None, wait=0):
time.sleep(1)
raise RuntimeError('unable to release lock after %i attempts' % wait)

def release_if_hold(self, lock_type, wait=0):
holder = self.get_current_lock_holder(lock_type)
if holder != helpers.get_hostname():
def release_if_hold(self, lock_type, wait=0, read_lock=False):
lock_type = lock_type or self.PRIMARY_LOCK_PATH
if read_lock:
holders = self.get_lock_contenders(lock_type, read_lock=read_lock)
else:
holders = [self.get_current_lock_holder(lock_type)]
if helpers.get_hostname() not in holders:
return True
return self.release_lock(lock_type, wait)

Expand Down
Loading

0 comments on commit 88448a9

Please sign in to comment.