Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better managing of replication slots #30

Merged
merged 6 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def read_config(filename=None, options=None):
'certfile': None,
'ca_cert': None,
'verify_certs': 'no',
'drop_slot_countdown': 10,
},
'primary': {
'change_replication_type': 'yes',
Expand Down
106 changes: 79 additions & 27 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(self, **kwargs):
self.checks = {'primary_switch': 0, 'failover': 0, 'rewind': 0}
self._is_single_node = False
self.notifier = sdnotify.Notifier()
self._slot_drop_countdown = {}

if self.config.getboolean('global', 'quorum_commit'):
self._replication_manager = QuorumReplicationManager(
Expand Down Expand Up @@ -304,6 +305,7 @@ def run_iteration(self, my_prio):
self.non_ha_replica_iter(db_state, zk_state)
else:
self.replica_iter(db_state, zk_state)
self._handle_slots()
munakoiso marked this conversation as resolved.
Show resolved Hide resolved
self.re_init_db()
self.re_init_zk()

Expand Down Expand Up @@ -371,7 +373,7 @@ def primary_iter(self, db_state, zk_state):
logging.warning('Could not acquire lock due to destructive operation fail: %s', last_op)
return self.release_lock_and_return_to_cluster()
if stream_from:
logging.warning('Host not in HA group We should return to stream_from.')
logging.warning('Host not in HA group. We should return to stream_from.')
return self.release_lock_and_return_to_cluster()

current_promoting_host = zk_state.get(self.zk.CURRENT_PROMOTING_HOST)
Expand Down Expand Up @@ -634,7 +636,11 @@ def non_ha_replica_iter(self, db_state, zk_state):
db_state['wal_receiver'],
replics_info,
)
if not streaming and not can_delayed:
current_primary = zk_state['lock_holder']

if streaming:
munakoiso marked this conversation as resolved.
Show resolved Hide resolved
self._acquire_replication_source_lock(stream_from)
elif not can_delayed:
logging.warning('Seems that we are not really streaming WAL from %s.', stream_from)
self._replication_manager.leave_sync_group()
replication_source_is_dead = self._check_host_is_really_dead(primary=stream_from)
Expand All @@ -646,8 +652,9 @@ def non_ha_replica_iter(self, db_state, zk_state):
wal_receiver_info and wal_receiver_info[0].get('status') == 'streaming'
)
logging.error(replication_source_replica_info)
current_primary = zk_state['lock_holder']

if replication_source_is_dead:
self._acquire_replication_source_lock(current_primary)
vicpopov marked this conversation as resolved.
Show resolved Hide resolved
# Replication source is dead. We need to streaming from primary while it became alive and start streaming from primary.
if stream_from == current_primary or current_primary is None:
logging.warning(
Expand All @@ -668,6 +675,7 @@ def non_ha_replica_iter(self, db_state, zk_state):
current_primary,
)
else:
self._acquire_replication_source_lock(stream_from)
# Replication source is alive. We need to wait while it starts streaming from primary and start streaming from it.
if replication_source_streams:
logging.warning(
Expand Down Expand Up @@ -776,6 +784,7 @@ def replica_iter(self, db_state, zk_state):
if holder != db_state['primary_fqdn'] and holder != my_hostname:
self._replication_manager.leave_sync_group()
return self.change_primary(db_state, holder)
self._acquire_replication_source_lock(holder)
vicpopov marked this conversation as resolved.
Show resolved Hide resolved

self.db.ensure_replaying_wal()

Expand Down Expand Up @@ -1021,7 +1030,6 @@ def _simple_primary_switch(self, limit, new_primary, is_dead):
# The easy way succeeded.
#
logging.info('Simple primary switch succeeded.')
self._primary_switch_handle_slots()
return True
else:
return False
Expand Down Expand Up @@ -1073,8 +1081,6 @@ def _attach_to_primary(self, new_primary, limit):
self.checks['primary_switch'] = 0
return None

self._primary_switch_handle_slots()

if not self._wait_for_streaming(limit):
self.checks['primary_switch'] = 0
return None
Expand All @@ -1083,24 +1089,49 @@ def _attach_to_primary(self, new_primary, limit):
self.db.checkpoint()
return True

def _primary_switch_handle_slots(self):
need_slots = self.config.getboolean('global', 'use_replication_slots')
if need_slots:
my_hostname = helpers.get_hostname()
hosts = self.zk.get_children(self.zk.MEMBERS_PATH)
if hosts:
if my_hostname in hosts:
hosts.remove(my_hostname)
hosts = [i.replace('.', '_').replace('-', '_') for i in hosts]
logging.debug(hosts)
if not self.db.replication_slots('drop', hosts):
logging.warning('Could not drop replication slots. Do not forget to do it manually!')
else:
logging.warning(
'Could not get all hosts list from ZK. '
'Replication slots should be dropped but we '
'are unable to do it. Skipping it.'
)
def _handle_slots(self):
if not self.config.getboolean('global', 'use_replication_slots'):
return

my_hostname = helpers.get_hostname()
try:
slot_lock_holders = self.zk.get_lock_contenders(os.path.join(self.zk.HOST_REPLICATION_SOURCES, my_hostname), read_lock=True, catch_except=False)
except Exception as e:
logging.warning(
'Could not get slot lock holders. %s'
'Can not handle replication slots. We will skip it this time', e
)
return
all_hosts = self.zk.get_children(self.zk.MEMBERS_PATH)
if not all_hosts:
logging.warning(
'Could not get all hosts list from ZK.'
'Can not handle replication slots. We will skip it this time'
)
return
hosts_except_lock_holders = list(set(all_hosts) - set(slot_lock_holders))
munakoiso marked this conversation as resolved.
Show resolved Hide resolved
non_holders_hosts = []
for host in hosts_except_lock_holders:
if host not in self._slot_drop_countdown:
self._slot_drop_countdown[host] = self.config.getfloat('global', 'drop_slot_countdown')
munakoiso marked this conversation as resolved.
Show resolved Hide resolved
self._slot_drop_countdown[host] -= 1
if self._slot_drop_countdown[host] <= 0:
munakoiso marked this conversation as resolved.
Show resolved Hide resolved
non_holders_hosts.append(host)
for host in slot_lock_holders:
munakoiso marked this conversation as resolved.
Show resolved Hide resolved
self._slot_drop_countdown[host] = self.config.getfloat('global', 'drop_slot_countdown')

# create slots
slot_names = [i.replace('.', '_').replace('-', '_') for i in slot_lock_holders]
munakoiso marked this conversation as resolved.
Show resolved Hide resolved

if not self.db.replication_slots('create', slot_names):
logging.warning('Could not create replication slots. %s', slot_names)

# drop slots
if my_hostname in non_holders_hosts:
non_holders_hosts.remove(my_hostname)
slot_names_to_drop = [i.replace('.', '_').replace('-', '_') for i in non_holders_hosts]
if not self.db.replication_slots('drop', slot_names_to_drop):
logging.warning('Could not drop replication slots. %s', slot_names_to_drop)

def _get_db_state(self):
state = self.db.get_data_from_control_file('Database cluster state')
Expand All @@ -1110,6 +1141,25 @@ def _get_db_state(self):
logging.info('Database cluster state is: %s' % state)
return state

def _acquire_replication_source_lock(self, primary):
munakoiso marked this conversation as resolved.
Show resolved Hide resolved
if not self.config.getboolean('global', 'use_replication_slots'):
return
# We need to drop the slot in the old primary.
# But we don't know who the primary was (probably there are many of them).
# So, we need to release the lock on all hosts.
all_hosts = self.zk.get_children(self.zk.MEMBERS_PATH)
munakoiso marked this conversation as resolved.
Show resolved Hide resolved
if all_hosts:
for host in all_hosts:
if primary != host:
self.zk.release_if_hold(os.path.join(self.zk.HOST_REPLICATION_SOURCES, host), read_lock=True)
else:
logging.warning(
'Could not get all hosts list from ZK.'
'Can not release old replication slot locks. We will skip it this time'
)
# And acquire lock (then new_primary will create replication slot)
self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, primary), read_lock=True)

def _return_to_cluster(self, new_primary, role, is_dead=False):
"""
Return to cluster (try stupid method, if it fails we try rewind)
Expand All @@ -1121,6 +1171,7 @@ def _return_to_cluster(self, new_primary, role, is_dead=False):
self.checks['primary_switch'] = 1
logging.debug("primary_switch checks is %d", self.checks['primary_switch'])

self._acquire_replication_source_lock(new_primary)
vicpopov marked this conversation as resolved.
Show resolved Hide resolved
failover_state = self.zk.noexcept_get(self.zk.FAILOVER_INFO_PATH)
if failover_state is not None and failover_state not in ('finished', 'promoting', 'checkpointing'):
logging.info(
Expand Down Expand Up @@ -1207,6 +1258,8 @@ def _promote(self):

logging.info('Promote command failed but we are current primary. Continue')

self._slot_drop_countdown = {}

if not self.zk.noexcept_write(self.zk.FAILOVER_INFO_PATH, 'checkpointing'):
logging.warning('Could not write failover state to ZK.')

Expand All @@ -1228,8 +1281,7 @@ def _promote(self):
return True

def _promote_handle_slots(self):
need_slots = self.config.getboolean('global', 'use_replication_slots')
if need_slots:
if self.config.getboolean('global', 'use_replication_slots'):
if not self.zk.write(self.zk.FAILOVER_INFO_PATH, 'creating_slots'):
logging.warning('Could not write failover state to ZK.')

Expand All @@ -1241,7 +1293,7 @@ def _promote_handle_slots(self):
'are unable to do it. Releasing the lock.'
)
return False

# Create replication slots, regardless of whether replicas hold DCS locks for replication slots.
hosts = [i.replace('.', '_').replace('-', '_') for i in hosts]
if not self.db.replication_slots('create', hosts):
logging.error('Could not create replication slots. Releasing the lock in ZK.')
Expand Down
2 changes: 1 addition & 1 deletion src/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ def replication_slots(self, action, slots):
if not self._create_replication_slot(slot):
return False
else:
if current and slot not in current:
if current is not None and slot not in current:
vicpopov marked this conversation as resolved.
Show resolved Hide resolved
logging.warning('Slot %s does not exist.', slot)
continue
if not self._drop_replication_slot(slot):
Expand Down
41 changes: 27 additions & 14 deletions src/zk.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class Zookeeper(object):
MAINTENANCE_PRIMARY_PATH = f'{MAINTENANCE_PATH}/master'
HOST_MAINTENANCE_PATH = f'{MAINTENANCE_PATH}/%s'
HOST_ALIVE_LOCK_PATH = 'alive/%s'
HOST_REPLICATION_SOURCES = 'replication_sources'

SINGLE_NODE_PATH = 'is_single_node'

Expand Down Expand Up @@ -188,11 +189,17 @@ def _write(self, path, data, need_lock=True):
logging.error(event.exception)
return not event.exception

def _init_lock(self, name):
def _init_lock(self, name, read_lock=False):
path = self._path_prefix + name
self._locks[name] = self._zk.Lock(path, helpers.get_hostname())
if read_lock:
lock = self._zk.ReadLock(path, helpers.get_hostname())
vicpopov marked this conversation as resolved.
Show resolved Hide resolved
else:
lock = self._zk.Lock(path, helpers.get_hostname())
self._locks[name] = lock

def _acquire_lock(self, name, allow_queue, timeout):
def _acquire_lock(self, name, allow_queue, timeout, read_lock=False):
if read_lock:
allow_queue = True
if timeout is None:
timeout = self._timeout
if self._zk.state != KazooState.CONNECTED:
Expand All @@ -202,11 +209,13 @@ def _acquire_lock(self, name, allow_queue, timeout):
lock = self._locks[name]
else:
logging.debug('No lock instance for %s. Creating one.', name)
self._init_lock(name)
self._init_lock(name, read_lock=read_lock)
lock = self._locks[name]
contenders = lock.contenders()
if len(contenders) != 0:
if contenders[0] == helpers.get_hostname():
if not read_lock:
contenders = contenders[:1]
if helpers.get_hostname() in contenders:
logging.debug('We already hold the %s lock.', name)
return True
if not allow_queue:
munakoiso marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -442,14 +451,14 @@ def get_current_lock_version(self):
return min([i.split('__')[-1] for i in children])
return None

def get_lock_contenders(self, name, catch_except=True):
def get_lock_contenders(self, name, catch_except=True, read_lock=False):
"""
Get a list of all hostnames that are competing for the lock,
including the holder.
"""
try:
if name not in self._locks:
self._init_lock(name)
self._init_lock(name, read_lock=read_lock)
contenders = self._locks[name].contenders()
if len(contenders) > 0:
return contenders
Expand All @@ -471,18 +480,18 @@ def get_current_lock_holder(self, name=None, catch_except=True):
else:
return None

def acquire_lock(self, lock_type, allow_queue=False, timeout=None):
result = self._acquire_lock(lock_type, allow_queue, timeout)
def acquire_lock(self, lock_type, allow_queue=False, timeout=None, read_lock=False):
result = self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock)
if not result:
raise ZookeeperException(f'Failed to acquire lock {lock_type}')
logging.debug(f'Success acquire lock: {lock_type}')

def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None):
def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None, read_lock=False):
"""
Acquire lock (leader by default)
"""
lock_type = lock_type or self.PRIMARY_LOCK_PATH
return self._acquire_lock(lock_type, allow_queue, timeout)
return self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock)

def release_lock(self, lock_type=None, wait=0):
"""
Expand All @@ -509,9 +518,13 @@ def release_lock(self, lock_type=None, wait=0):
time.sleep(1)
raise RuntimeError('unable to release lock after %i attempts' % wait)

def release_if_hold(self, lock_type, wait=0):
holder = self.get_current_lock_holder(lock_type)
if holder != helpers.get_hostname():
def release_if_hold(self, lock_type, wait=0, read_lock=False):
lock_type = lock_type or self.PRIMARY_LOCK_PATH
munakoiso marked this conversation as resolved.
Show resolved Hide resolved
if read_lock:
holders = self.get_lock_contenders(lock_type, read_lock=read_lock)
else:
holders = [self.get_current_lock_holder(lock_type)]
if helpers.get_hostname() not in holders:
return True
return self.release_lock(lock_type, wait)

Expand Down
Loading