diff --git a/src/__init__.py b/src/__init__.py index 3e20c42..5430fdb 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -77,6 +77,8 @@ def read_config(filename=None, options=None): 'certfile': None, 'ca_cert': None, 'verify_certs': 'no', + 'drop_slot_countdown': 10, + 'replication_slots_polling': None, }, 'primary': { 'change_replication_type': 'yes', @@ -131,6 +133,8 @@ def read_config(filename=None, options=None): for key, value in defaults[section].items(): if not config.has_option(section, key): config.set(section, key, value) + if config.get('global', 'replication_slots_polling') is None: + config.set('global', 'replication_slots_polling', config.get('global', 'use_replication_slots')) # # Rewriting global config with parameters from command line. diff --git a/src/main.py b/src/main.py index 82ff1c9..3a17cf5 100644 --- a/src/main.py +++ b/src/main.py @@ -53,6 +53,7 @@ def __init__(self, **kwargs): self.checks = {'primary_switch': 0, 'failover': 0, 'rewind': 0} self._is_single_node = False self.notifier = sdnotify.Notifier() + self._slot_drop_countdown = {} if self.config.getboolean('global', 'quorum_commit'): self._replication_manager = QuorumReplicationManager( @@ -199,6 +200,11 @@ def start(self): """ Start iterations """ + if (not self.config.getboolean('global', 'use_replication_slots') and + self.config.getboolean('global', 'replication_slots_polling')): + logging.warning('Force disable replication_slots_polling because use_replication_slots is disabled.') + self.config.set('global', 'replication_slots_polling', 'no') + my_prio = self.config.get('global', 'priority') self.notifier.ready() while True: @@ -371,7 +377,7 @@ def primary_iter(self, db_state, zk_state): logging.warning('Could not acquire lock due to destructive operation fail: %s', last_op) return self.release_lock_and_return_to_cluster() if stream_from: - logging.warning('Host not in HA group We should return to stream_from.') + logging.warning('Host not in HA group. We should return to stream_from.') return self.release_lock_and_return_to_cluster() current_promoting_host = zk_state.get(self.zk.CURRENT_PROMOTING_HOST) @@ -397,6 +403,8 @@ def primary_iter(self, db_state, zk_state): self.checks['primary_switch'] = 0 + self._handle_slots() + self._store_replics_info(db_state, zk_state) # Make sure local timeline corresponds to that of the cluster. @@ -634,7 +642,13 @@ def non_ha_replica_iter(self, db_state, zk_state): db_state['wal_receiver'], replics_info, ) - if not streaming and not can_delayed: + current_primary = zk_state['lock_holder'] + + if streaming_from_primary and not streaming: + self._acquire_replication_source_slot_lock(current_primary) + if streaming: + self._acquire_replication_source_slot_lock(stream_from) + elif not can_delayed: logging.warning('Seems that we are not really streaming WAL from %s.', stream_from) self._replication_manager.leave_sync_group() replication_source_is_dead = self._check_host_is_really_dead(primary=stream_from) @@ -646,7 +660,7 @@ def non_ha_replica_iter(self, db_state, zk_state): wal_receiver_info and wal_receiver_info[0].get('status') == 'streaming' ) logging.error(replication_source_replica_info) - current_primary = zk_state['lock_holder'] + if replication_source_is_dead: # Replication source is dead. We need to streaming from primary while it became alive and start streaming from primary. if stream_from == current_primary or current_primary is None: @@ -689,6 +703,7 @@ def non_ha_replica_iter(self, db_state, zk_state): self.checks['primary_switch'] = 0 self.start_pooler() self._reset_simple_primary_switch_try() + self._handle_slots() except Exception: for line in traceback.format_exc().split('\n'): logging.error(line.rstrip()) @@ -776,6 +791,7 @@ def replica_iter(self, db_state, zk_state): if holder != db_state['primary_fqdn'] and holder != my_hostname: self._replication_manager.leave_sync_group() return self.change_primary(db_state, holder) + self._acquire_replication_source_slot_lock(holder) self.db.ensure_replaying_wal() @@ -791,6 +807,7 @@ def replica_iter(self, db_state, zk_state): self._reset_simple_primary_switch_try() self._replication_manager.enter_sync_group(replica_infos=replics_info) + self._handle_slots() except Exception: for line in traceback.format_exc().split('\n'): logging.error(line.rstrip()) @@ -1021,7 +1038,6 @@ def _simple_primary_switch(self, limit, new_primary, is_dead): # The easy way succeeded. # logging.info('Simple primary switch succeeded.') - self._primary_switch_handle_slots() return True else: return False @@ -1073,8 +1089,6 @@ def _attach_to_primary(self, new_primary, limit): self.checks['primary_switch'] = 0 return None - self._primary_switch_handle_slots() - if not self._wait_for_streaming(limit): self.checks['primary_switch'] = 0 return None @@ -1083,24 +1097,50 @@ def _attach_to_primary(self, new_primary, limit): self.db.checkpoint() return True - def _primary_switch_handle_slots(self): - need_slots = self.config.getboolean('global', 'use_replication_slots') - if need_slots: - my_hostname = helpers.get_hostname() - hosts = self.zk.get_children(self.zk.MEMBERS_PATH) - if hosts: - if my_hostname in hosts: - hosts.remove(my_hostname) - hosts = [i.replace('.', '_').replace('-', '_') for i in hosts] - logging.debug(hosts) - if not self.db.replication_slots('drop', hosts): - logging.warning('Could not drop replication slots. Do not forget to do it manually!') + def _handle_slots(self): + if not self.config.getboolean('global', 'replication_slots_polling'): + return + + my_hostname = helpers.get_hostname() + try: + slot_lock_holders = set(self.zk.get_lock_contenders(os.path.join(self.zk.HOST_REPLICATION_SOURCES, my_hostname), read_lock=True, catch_except=False)) + except Exception as e: + logging.warning( + 'Could not get slot lock holders. %s' + 'Can not handle replication slots. We will skip it this time', e + ) + return + all_hosts = self.zk.get_children(self.zk.MEMBERS_PATH) + if not all_hosts: + logging.warning( + 'Could not get all hosts list from ZK.' + 'Can not handle replication slots. We will skip it this time' + ) + return + non_holders_hosts = [] + + for host in all_hosts: + if host in slot_lock_holders: + self._slot_drop_countdown[host] = self.config.getint('global', 'drop_slot_countdown') else: - logging.warning( - 'Could not get all hosts list from ZK. ' - 'Replication slots should be dropped but we ' - 'are unable to do it. Skipping it.' - ) + if host not in self._slot_drop_countdown: + self._slot_drop_countdown[host] = self.config.getint('global', 'drop_slot_countdown') + self._slot_drop_countdown[host] -= 1 + if self._slot_drop_countdown[host] < 0: + non_holders_hosts.append(host) + + # create slots + slot_names = [helpers.app_name_from_fqdn(fqdn) for fqdn in slot_lock_holders] + + if not self.db.replication_slots('create', slot_names): + logging.warning('Could not create replication slots. %s', slot_names) + + # drop slots + if my_hostname in non_holders_hosts: + non_holders_hosts.remove(my_hostname) + slot_names_to_drop = [helpers.app_name_from_fqdn(fqdn) for fqdn in non_holders_hosts] + if not self.db.replication_slots('drop', slot_names_to_drop): + logging.warning('Could not drop replication slots. %s', slot_names_to_drop) def _get_db_state(self): state = self.db.get_data_from_control_file('Database cluster state') @@ -1110,6 +1150,25 @@ def _get_db_state(self): logging.info('Database cluster state is: %s' % state) return state + def _acquire_replication_source_slot_lock(self, source): + if not self.config.getboolean('global', 'replication_slots_polling'): + return + # We need to drop the slot in the old primary. + # But we don't know who the primary was (probably there are many of them). + # So, we need to release the lock on all hosts. + replication_sources = self.zk.get_children(self.zk.HOST_REPLICATION_SOURCES) + if replication_sources: + for host in replication_sources: + if source != host: + self.zk.release_if_hold(os.path.join(self.zk.HOST_REPLICATION_SOURCES, host), read_lock=True) + else: + logging.warning( + 'Could not get all hosts list from ZK.' + 'Can not release old replication slot locks. We will skip it this time' + ) + # And acquire lock (then new_primary will create replication slot) + self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, source), read_lock=True) + def _return_to_cluster(self, new_primary, role, is_dead=False): """ Return to cluster (try stupid method, if it fails we try rewind) @@ -1121,6 +1180,7 @@ def _return_to_cluster(self, new_primary, role, is_dead=False): self.checks['primary_switch'] = 1 logging.debug("primary_switch checks is %d", self.checks['primary_switch']) + self._acquire_replication_source_slot_lock(new_primary) failover_state = self.zk.noexcept_get(self.zk.FAILOVER_INFO_PATH) if failover_state is not None and failover_state not in ('finished', 'promoting', 'checkpointing'): logging.info( @@ -1207,6 +1267,8 @@ def _promote(self): logging.info('Promote command failed but we are current primary. Continue') + self._slot_drop_countdown = {} + if not self.zk.noexcept_write(self.zk.FAILOVER_INFO_PATH, 'checkpointing'): logging.warning('Could not write failover state to ZK.') @@ -1228,8 +1290,7 @@ def _promote(self): return True def _promote_handle_slots(self): - need_slots = self.config.getboolean('global', 'use_replication_slots') - if need_slots: + if self.config.getboolean('global', 'use_replication_slots'): if not self.zk.write(self.zk.FAILOVER_INFO_PATH, 'creating_slots'): logging.warning('Could not write failover state to ZK.') @@ -1241,8 +1302,8 @@ def _promote_handle_slots(self): 'are unable to do it. Releasing the lock.' ) return False - - hosts = [i.replace('.', '_').replace('-', '_') for i in hosts] + # Create replication slots, regardless of whether replicas hold DCS locks for replication slots. + hosts = [helpers.app_name_from_fqdn(fqdn) for fqdn in hosts] if not self.db.replication_slots('create', hosts): logging.error('Could not create replication slots. Releasing the lock in ZK.') return False diff --git a/src/pg.py b/src/pg.py index 921e6d0..baa5000 100644 --- a/src/pg.py +++ b/src/pg.py @@ -748,7 +748,7 @@ def replication_slots(self, action, slots): if not self._create_replication_slot(slot): return False else: - if current and slot not in current: + if current is not None and slot not in current: logging.warning('Slot %s does not exist.', slot) continue if not self._drop_replication_slot(slot): diff --git a/src/zk.py b/src/zk.py index a3811ca..5b6aefa 100644 --- a/src/zk.py +++ b/src/zk.py @@ -59,6 +59,7 @@ class Zookeeper(object): MAINTENANCE_PRIMARY_PATH = f'{MAINTENANCE_PATH}/master' HOST_MAINTENANCE_PATH = f'{MAINTENANCE_PATH}/%s' HOST_ALIVE_LOCK_PATH = 'alive/%s' + HOST_REPLICATION_SOURCES = 'replication_sources' SINGLE_NODE_PATH = 'is_single_node' @@ -188,11 +189,15 @@ def _write(self, path, data, need_lock=True): logging.error(event.exception) return not event.exception - def _init_lock(self, name): + def _init_lock(self, name, read_lock=False): path = self._path_prefix + name - self._locks[name] = self._zk.Lock(path, helpers.get_hostname()) + if read_lock: + lock = self._zk.ReadLock(path, helpers.get_hostname()) + else: + lock = self._zk.Lock(path, helpers.get_hostname()) + self._locks[name] = lock - def _acquire_lock(self, name, allow_queue, timeout): + def _acquire_lock(self, name, allow_queue, timeout, read_lock=False): if timeout is None: timeout = self._timeout if self._zk.state != KazooState.CONNECTED: @@ -202,14 +207,16 @@ def _acquire_lock(self, name, allow_queue, timeout): lock = self._locks[name] else: logging.debug('No lock instance for %s. Creating one.', name) - self._init_lock(name) + self._init_lock(name, read_lock=read_lock) lock = self._locks[name] contenders = lock.contenders() if len(contenders) != 0: - if contenders[0] == helpers.get_hostname(): + if not read_lock: + contenders = contenders[:1] + if helpers.get_hostname() in contenders: logging.debug('We already hold the %s lock.', name) return True - if not allow_queue: + if not (allow_queue or read_lock): logging.warning('%s lock is already taken by %s.', name[0].upper() + name[1:], contenders[0]) return False try: @@ -442,14 +449,14 @@ def get_current_lock_version(self): return min([i.split('__')[-1] for i in children]) return None - def get_lock_contenders(self, name, catch_except=True): + def get_lock_contenders(self, name, catch_except=True, read_lock=False): """ Get a list of all hostnames that are competing for the lock, including the holder. """ try: if name not in self._locks: - self._init_lock(name) + self._init_lock(name, read_lock=read_lock) contenders = self._locks[name].contenders() if len(contenders) > 0: return contenders @@ -471,18 +478,18 @@ def get_current_lock_holder(self, name=None, catch_except=True): else: return None - def acquire_lock(self, lock_type, allow_queue=False, timeout=None): - result = self._acquire_lock(lock_type, allow_queue, timeout) + def acquire_lock(self, lock_type, allow_queue=False, timeout=None, read_lock=False): + result = self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock) if not result: raise ZookeeperException(f'Failed to acquire lock {lock_type}') logging.debug(f'Success acquire lock: {lock_type}') - def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None): + def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None, read_lock=False): """ Acquire lock (leader by default) """ lock_type = lock_type or self.PRIMARY_LOCK_PATH - return self._acquire_lock(lock_type, allow_queue, timeout) + return self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock) def release_lock(self, lock_type=None, wait=0): """ @@ -509,9 +516,12 @@ def release_lock(self, lock_type=None, wait=0): time.sleep(1) raise RuntimeError('unable to release lock after %i attempts' % wait) - def release_if_hold(self, lock_type, wait=0): - holder = self.get_current_lock_holder(lock_type) - if holder != helpers.get_hostname(): + def release_if_hold(self, lock_type, wait=0, read_lock=False): + if read_lock: + holders = self.get_lock_contenders(lock_type, read_lock=read_lock) + else: + holders = [self.get_current_lock_holder(lock_type)] + if helpers.get_hostname() not in holders: return True return self.release_lock(lock_type, wait) diff --git a/tests/features/cascade.feature b/tests/features/cascade.feature index 481ea6b..7836d04 100644 --- a/tests/features/cascade.feature +++ b/tests/features/cascade.feature @@ -354,3 +354,115 @@ Feature: Check not HA hosts Examples: , | lock_type | lock_host | | zookeeper | zookeeper1 | + + + Scenario Outline: Replication slots created automatically + Given a "pgconsul" container common config + """ + pgconsul.conf: + global: + priority: 0 + use_replication_slots: 'yes' + quorum_commit: 'yes' + primary: + change_replication_type: 'yes' + primary_switch_checks: 1 + replica: + allow_potential_data_loss: 'no' + primary_unavailability_timeout: 1 + primary_switch_checks: 1 + min_failover_timeout: 1 + primary_unavailability_timeout: 2 + recovery_timeout: 30 + commands: + generate_recovery_conf: /usr/local/bin/gen_rec_conf_without_slot.sh %m %p + """ + Given a following cluster with "" with replication slots + """ + postgresql1: + role: primary + postgresql2: + role: replica + postgresql3: + role: replica + config: + pgconsul.conf: + global: + stream_from: pgconsul_postgresql2_1.pgconsul_pgconsul_net + stream_from: postgresql2 + """ + + Then "" has holder "pgconsul_postgresql1_1.pgconsul_pgconsul_net" for lock "/pgconsul/postgresql/leader" + And container "postgresql2" is in quorum group + Then container "postgresql1" has following replication slots + """ + - slot_name: pgconsul_postgresql2_1_pgconsul_pgconsul_net + slot_type: physical + """ + And container "postgresql2" has following replication slots + """ + - slot_name: pgconsul_postgresql3_1_pgconsul_pgconsul_net + slot_type: physical + """ + When we disconnect from network container "postgresql3" + Then container "postgresql2" has following replication slots + """ + - slot_name: pgconsul_postgresql3_1_pgconsul_pgconsul_net + slot_type: physical + """ + When we wait "10.0" seconds + Then container "postgresql2" has following replication slots + """ + """ + And container "postgresql1" has following replication slots + """ + - slot_name: pgconsul_postgresql2_1_pgconsul_pgconsul_net + slot_type: physical + """ + When we connect to network container "postgresql3" + Then container "postgresql2" has following replication slots + """ + - slot_name: pgconsul_postgresql3_1_pgconsul_pgconsul_net + slot_type: physical + """ + When we disconnect from network container "postgresql2" + Then container "postgresql1" has following replication slots + """ + - slot_name: pgconsul_postgresql2_1_pgconsul_pgconsul_net + slot_type: physical + - slot_name: pgconsul_postgresql3_1_pgconsul_pgconsul_net + slot_type: physical + """ + When we wait "10.0" seconds + Then container "postgresql1" has following replication slots + """ + - slot_name: pgconsul_postgresql3_1_pgconsul_pgconsul_net + slot_type: physical + """ + When we connect to network container "postgresql2" + Then container "postgresql1" has following replication slots + """ + - slot_name: pgconsul_postgresql2_1_pgconsul_pgconsul_net + slot_type: physical + - slot_name: pgconsul_postgresql3_1_pgconsul_pgconsul_net + slot_type: physical + """ + And container "postgresql2" has following replication slots + """ + - slot_name: pgconsul_postgresql3_1_pgconsul_pgconsul_net + slot_type: physical + """ + When we wait "10.0" seconds + Then container "postgresql1" has following replication slots + """ + - slot_name: pgconsul_postgresql2_1_pgconsul_pgconsul_net + slot_type: physical + """ + And container "postgresql2" has following replication slots + """ + - slot_name: pgconsul_postgresql3_1_pgconsul_pgconsul_net + slot_type: physical + """ + Examples: , + | lock_type | lock_host | + | zookeeper | zookeeper1 | diff --git a/tests/features/kill_primary.feature b/tests/features/kill_primary.feature index 27782e2..c110f56 100644 --- a/tests/features/kill_primary.feature +++ b/tests/features/kill_primary.feature @@ -343,8 +343,15 @@ Feature: Destroy primary in various scenarios - client_hostname: pgconsul_postgresql3_1.pgconsul_pgconsul_net state: streaming """ + When we set value "no" for option "replication_slots_polling" in section "global" in pgconsul config in container "postgresql3" + And we restart "pgconsul" in container "postgresql3" When we stop container "postgresql3" - When we drop replication slot "pgconsul_postgresql3_1_pgconsul_pgconsul_net" in container "postgresql1" + And we wait "10.0" seconds + Then container "postgresql1" has following replication slots + """ + - slot_name: pgconsul_postgresql2_1_pgconsul_pgconsul_net + slot_type: physical + """ When we start container "postgresql3" Then "" has value "['pgconsul_postgresql2_1.pgconsul_pgconsul_net']" for key "/pgconsul/postgresql/quorum" When we wait "10.0" seconds @@ -352,6 +359,8 @@ Feature: Destroy primary in various scenarios Then "" has holder "pgconsul_postgresql2_1.pgconsul_pgconsul_net" for lock "/pgconsul/postgresql/leader" Then container "postgresql2" became a primary Then "" has value "finished" for key "/pgconsul/postgresql/failover_state" + When we set value "yes" for option "replication_slots_polling" in section "global" in pgconsul config in container "postgresql3" + And we restart "pgconsul" in container "postgresql3" Then container "postgresql3" is in quorum group Then "" has following values for key "/pgconsul/postgresql/replics_info" """ diff --git a/tests/steps/database.py b/tests/steps/database.py index 002199b..7ba488e 100644 --- a/tests/steps/database.py +++ b/tests/steps/database.py @@ -8,6 +8,7 @@ import psycopg2 from psycopg2.extras import RealDictCursor +from psycopg2.errors import DuplicateObject import select @@ -111,13 +112,16 @@ def get_config_option(self, option): return deepcopy(self.cursor.fetchone())['opt'] def create_replication_slot(self, slot_name): - self.cursor.execute( - """ - SELECT pg_create_physical_replication_slot(%(name)s) - """, - {'name': slot_name}, - ) - return deepcopy(self.cursor.fetchone()) + try: + self.cursor.execute( + """ + SELECT pg_create_physical_replication_slot(%(name)s) + """, + {'name': slot_name}, + ) + return deepcopy(self.cursor.fetchone()) + except DuplicateObject: + return [True] def get_replication_slots(self): self.cursor.execute(