diff --git a/src/__init__.py b/src/__init__.py index 3e20c42..7aa6890 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -77,6 +77,7 @@ def read_config(filename=None, options=None): 'certfile': None, 'ca_cert': None, 'verify_certs': 'no', + 'drop_slot_countdown': 10, }, 'primary': { 'change_replication_type': 'yes', diff --git a/src/main.py b/src/main.py index 82ff1c9..e105998 100644 --- a/src/main.py +++ b/src/main.py @@ -53,6 +53,7 @@ def __init__(self, **kwargs): self.checks = {'primary_switch': 0, 'failover': 0, 'rewind': 0} self._is_single_node = False self.notifier = sdnotify.Notifier() + self._slot_drop_countdown = {} if self.config.getboolean('global', 'quorum_commit'): self._replication_manager = QuorumReplicationManager( @@ -304,6 +305,7 @@ def run_iteration(self, my_prio): self.non_ha_replica_iter(db_state, zk_state) else: self.replica_iter(db_state, zk_state) + self._handle_slots() self.re_init_db() self.re_init_zk() @@ -371,7 +373,7 @@ def primary_iter(self, db_state, zk_state): logging.warning('Could not acquire lock due to destructive operation fail: %s', last_op) return self.release_lock_and_return_to_cluster() if stream_from: - logging.warning('Host not in HA group We should return to stream_from.') + logging.warning('Host not in HA group. We should return to stream_from.') return self.release_lock_and_return_to_cluster() current_promoting_host = zk_state.get(self.zk.CURRENT_PROMOTING_HOST) @@ -634,7 +636,11 @@ def non_ha_replica_iter(self, db_state, zk_state): db_state['wal_receiver'], replics_info, ) - if not streaming and not can_delayed: + current_primary = zk_state['lock_holder'] + + if streaming: + self._acquire_replication_source_lock(stream_from) + elif not can_delayed: logging.warning('Seems that we are not really streaming WAL from %s.', stream_from) self._replication_manager.leave_sync_group() replication_source_is_dead = self._check_host_is_really_dead(primary=stream_from) @@ -646,8 +652,9 @@ def non_ha_replica_iter(self, db_state, zk_state): wal_receiver_info and wal_receiver_info[0].get('status') == 'streaming' ) logging.error(replication_source_replica_info) - current_primary = zk_state['lock_holder'] + if replication_source_is_dead: + self._acquire_replication_source_lock(current_primary) # Replication source is dead. We need to streaming from primary while it became alive and start streaming from primary. if stream_from == current_primary or current_primary is None: logging.warning( @@ -668,6 +675,7 @@ def non_ha_replica_iter(self, db_state, zk_state): current_primary, ) else: + self._acquire_replication_source_lock(stream_from) # Replication source is alive. We need to wait while it starts streaming from primary and start streaming from it. if replication_source_streams: logging.warning( @@ -776,6 +784,7 @@ def replica_iter(self, db_state, zk_state): if holder != db_state['primary_fqdn'] and holder != my_hostname: self._replication_manager.leave_sync_group() return self.change_primary(db_state, holder) + self._acquire_replication_source_lock(holder) self.db.ensure_replaying_wal() @@ -1021,7 +1030,6 @@ def _simple_primary_switch(self, limit, new_primary, is_dead): # The easy way succeeded. # logging.info('Simple primary switch succeeded.') - self._primary_switch_handle_slots() return True else: return False @@ -1073,8 +1081,6 @@ def _attach_to_primary(self, new_primary, limit): self.checks['primary_switch'] = 0 return None - self._primary_switch_handle_slots() - if not self._wait_for_streaming(limit): self.checks['primary_switch'] = 0 return None @@ -1083,24 +1089,50 @@ def _attach_to_primary(self, new_primary, limit): self.db.checkpoint() return True - def _primary_switch_handle_slots(self): + def _handle_slots(self): need_slots = self.config.getboolean('global', 'use_replication_slots') - if need_slots: - my_hostname = helpers.get_hostname() - hosts = self.zk.get_children(self.zk.MEMBERS_PATH) - if hosts: - if my_hostname in hosts: - hosts.remove(my_hostname) - hosts = [i.replace('.', '_').replace('-', '_') for i in hosts] - logging.debug(hosts) - if not self.db.replication_slots('drop', hosts): - logging.warning('Could not drop replication slots. Do not forget to do it manually!') - else: - logging.warning( - 'Could not get all hosts list from ZK. ' - 'Replication slots should be dropped but we ' - 'are unable to do it. Skipping it.' - ) + if not need_slots: + return + + my_hostname = helpers.get_hostname() + try: + slot_lock_holders = self.zk.get_lock_contenders(os.path.join(self.zk.HOST_REPLICATION_SOURCES, my_hostname), read_lock=True, catch_except=False) + except Exception as e: + logging.warning( + 'Could not get slot lock holders. %s' + 'Can not handle replication slots. We will skip it this time', e + ) + return + all_hosts = self.zk.get_children(self.zk.MEMBERS_PATH) + if not all_hosts: + logging.warning( + 'Could not get all hosts list from ZK.' + 'Can not handle replication slots. We will skip it this time' + ) + return + hosts_except_lock_holders = list(set(all_hosts) - set(slot_lock_holders)) + non_holders_hosts = [] + for host in hosts_except_lock_holders: + if host not in self._slot_drop_countdown: + self._slot_drop_countdown[host] = self.config.getfloat('global', 'drop_slot_countdown') + self._slot_drop_countdown[host] -= 1 + if self._slot_drop_countdown[host] <= 0: + non_holders_hosts.append(host) + for host in slot_lock_holders: + self._slot_drop_countdown[host] = self.config.getfloat('global', 'drop_slot_countdown') + + # create slots + slot_names = [i.replace('.', '_').replace('-', '_') for i in slot_lock_holders] + + if not self.db.replication_slots('create', slot_names): + logging.warning('Could not create replication slots. %s', slot_names) + + # drop slots + if my_hostname in non_holders_hosts: + non_holders_hosts.remove(my_hostname) + slot_names_to_drop = [i.replace('.', '_').replace('-', '_') for i in non_holders_hosts] + if not self.db.replication_slots('drop', slot_names_to_drop): + logging.warning('Could not drop replication slots. %s', slot_names_to_drop) def _get_db_state(self): state = self.db.get_data_from_control_file('Database cluster state') @@ -1110,6 +1142,26 @@ def _get_db_state(self): logging.info('Database cluster state is: %s' % state) return state + def _acquire_replication_source_lock(self, primary): + need_slots = self.config.getboolean('global', 'use_replication_slots') + if not need_slots: + return + # We need to drop the slot in the old primary. + # But we don't know who the primary was (probably there are many of them). + # So, we need to release the lock on all hosts. + all_hosts = self.zk.get_children(self.zk.MEMBERS_PATH) + if all_hosts: + for host in all_hosts: + if primary != host: + self.zk.release_if_hold(os.path.join(self.zk.HOST_REPLICATION_SOURCES, host), read_lock=True) + else: + logging.warning( + 'Could not get all hosts list from ZK.' + 'Can not release old replication slot locks. We will skip it this time' + ) + # And acquire lock (then new_primary will create replication slot) + self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, primary), read_lock=True) + def _return_to_cluster(self, new_primary, role, is_dead=False): """ Return to cluster (try stupid method, if it fails we try rewind) @@ -1121,6 +1173,7 @@ def _return_to_cluster(self, new_primary, role, is_dead=False): self.checks['primary_switch'] = 1 logging.debug("primary_switch checks is %d", self.checks['primary_switch']) + self._acquire_replication_source_lock(new_primary) failover_state = self.zk.noexcept_get(self.zk.FAILOVER_INFO_PATH) if failover_state is not None and failover_state not in ('finished', 'promoting', 'checkpointing'): logging.info( @@ -1207,6 +1260,8 @@ def _promote(self): logging.info('Promote command failed but we are current primary. Continue') + self._slot_drop_countdown = {} + if not self.zk.noexcept_write(self.zk.FAILOVER_INFO_PATH, 'checkpointing'): logging.warning('Could not write failover state to ZK.') @@ -1241,7 +1296,7 @@ def _promote_handle_slots(self): 'are unable to do it. Releasing the lock.' ) return False - + # Create replication slots, regardless of whether replicas hold DCS locks for replication slots. hosts = [i.replace('.', '_').replace('-', '_') for i in hosts] if not self.db.replication_slots('create', hosts): logging.error('Could not create replication slots. Releasing the lock in ZK.') diff --git a/src/pg.py b/src/pg.py index 921e6d0..baa5000 100644 --- a/src/pg.py +++ b/src/pg.py @@ -748,7 +748,7 @@ def replication_slots(self, action, slots): if not self._create_replication_slot(slot): return False else: - if current and slot not in current: + if current is not None and slot not in current: logging.warning('Slot %s does not exist.', slot) continue if not self._drop_replication_slot(slot): diff --git a/src/zk.py b/src/zk.py index a3811ca..01192b4 100644 --- a/src/zk.py +++ b/src/zk.py @@ -59,6 +59,7 @@ class Zookeeper(object): MAINTENANCE_PRIMARY_PATH = f'{MAINTENANCE_PATH}/master' HOST_MAINTENANCE_PATH = f'{MAINTENANCE_PATH}/%s' HOST_ALIVE_LOCK_PATH = 'alive/%s' + HOST_REPLICATION_SOURCES = 'replication_sources' SINGLE_NODE_PATH = 'is_single_node' @@ -188,11 +189,17 @@ def _write(self, path, data, need_lock=True): logging.error(event.exception) return not event.exception - def _init_lock(self, name): + def _init_lock(self, name, read_lock=False): path = self._path_prefix + name - self._locks[name] = self._zk.Lock(path, helpers.get_hostname()) + if read_lock: + lock = self._zk.ReadLock(path, helpers.get_hostname()) + else: + lock = self._zk.Lock(path, helpers.get_hostname()) + self._locks[name] = lock - def _acquire_lock(self, name, allow_queue, timeout): + def _acquire_lock(self, name, allow_queue, timeout, read_lock=False): + if read_lock: + allow_queue = True if timeout is None: timeout = self._timeout if self._zk.state != KazooState.CONNECTED: @@ -202,11 +209,13 @@ def _acquire_lock(self, name, allow_queue, timeout): lock = self._locks[name] else: logging.debug('No lock instance for %s. Creating one.', name) - self._init_lock(name) + self._init_lock(name, read_lock=read_lock) lock = self._locks[name] contenders = lock.contenders() if len(contenders) != 0: - if contenders[0] == helpers.get_hostname(): + if not read_lock: + contenders = contenders[:1] + if helpers.get_hostname() in contenders: logging.debug('We already hold the %s lock.', name) return True if not allow_queue: @@ -442,14 +451,14 @@ def get_current_lock_version(self): return min([i.split('__')[-1] for i in children]) return None - def get_lock_contenders(self, name, catch_except=True): + def get_lock_contenders(self, name, catch_except=True, read_lock=False): """ Get a list of all hostnames that are competing for the lock, including the holder. """ try: if name not in self._locks: - self._init_lock(name) + self._init_lock(name, read_lock=read_lock) contenders = self._locks[name].contenders() if len(contenders) > 0: return contenders @@ -471,18 +480,18 @@ def get_current_lock_holder(self, name=None, catch_except=True): else: return None - def acquire_lock(self, lock_type, allow_queue=False, timeout=None): - result = self._acquire_lock(lock_type, allow_queue, timeout) + def acquire_lock(self, lock_type, allow_queue=False, timeout=None, read_lock=False): + result = self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock) if not result: raise ZookeeperException(f'Failed to acquire lock {lock_type}') logging.debug(f'Success acquire lock: {lock_type}') - def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None): + def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None, read_lock=False): """ Acquire lock (leader by default) """ lock_type = lock_type or self.PRIMARY_LOCK_PATH - return self._acquire_lock(lock_type, allow_queue, timeout) + return self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock) def release_lock(self, lock_type=None, wait=0): """ @@ -509,9 +518,13 @@ def release_lock(self, lock_type=None, wait=0): time.sleep(1) raise RuntimeError('unable to release lock after %i attempts' % wait) - def release_if_hold(self, lock_type, wait=0): - holder = self.get_current_lock_holder(lock_type) - if holder != helpers.get_hostname(): + def release_if_hold(self, lock_type, wait=0, read_lock=False): + lock_type = lock_type or self.PRIMARY_LOCK_PATH + if read_lock: + holders = self.get_lock_contenders(lock_type, read_lock=read_lock) + else: + holders = [self.get_current_lock_holder(lock_type)] + if helpers.get_hostname() not in holders: return True return self.release_lock(lock_type, wait) diff --git a/tests/features/cascade.feature b/tests/features/cascade.feature index 481ea6b..e6ed972 100644 --- a/tests/features/cascade.feature +++ b/tests/features/cascade.feature @@ -354,3 +354,115 @@ Feature: Check not HA hosts Examples: , | lock_type | lock_host | | zookeeper | zookeeper1 | + + + Scenario Outline: Replication slots created automatically + Given a "pgconsul" container common config + """ + pgconsul.conf: + global: + priority: 0 + use_replication_slots: 'yes' + quorum_commit: 'yes' + primary: + change_replication_type: 'yes' + primary_switch_checks: 1 + replica: + allow_potential_data_loss: 'no' + primary_unavailability_timeout: 1 + primary_switch_checks: 1 + min_failover_timeout: 1 + primary_unavailability_timeout: 2 + recovery_timeout: 30 + commands: + generate_recovery_conf: /usr/local/bin/gen_rec_conf_without_slot.sh %m %p + """ + Given a following cluster with "" without replication slots + """ + postgresql1: + role: primary + postgresql2: + role: replica + postgresql3: + role: replica + config: + pgconsul.conf: + global: + stream_from: pgconsul_postgresql2_1.pgconsul_pgconsul_net + stream_from: postgresql2 + """ + + Then "" has holder "pgconsul_postgresql1_1.pgconsul_pgconsul_net" for lock "/pgconsul/postgresql/leader" + And container "postgresql2" is in quorum group + Then container "postgresql1" has following replication slots + """ + - slot_name: pgconsul_postgresql2_1_pgconsul_pgconsul_net + slot_type: physical + """ + And container "postgresql2" has following replication slots + """ + - slot_name: pgconsul_postgresql3_1_pgconsul_pgconsul_net + slot_type: physical + """ + When we disconnect from network container "postgresql3" + Then container "postgresql2" has following replication slots + """ + - slot_name: pgconsul_postgresql3_1_pgconsul_pgconsul_net + slot_type: physical + """ + When we wait "10.0" seconds + Then container "postgresql2" has following replication slots + """ + """ + And container "postgresql1" has following replication slots + """ + - slot_name: pgconsul_postgresql2_1_pgconsul_pgconsul_net + slot_type: physical + """ + When we connect to network container "postgresql2" + Then container "postgresql2" has following replication slots + """ + - slot_name: pgconsul_postgresql3_1_pgconsul_pgconsul_net + slot_type: physical + """ + When we disconnect from network container "postgresql2" + Then container "postgresql1" has following replication slots + """ + - slot_name: pgconsul_postgresql2_1_pgconsul_pgconsul_net + slot_type: physical + - slot_name: pgconsul_postgresql3_1_pgconsul_pgconsul_net + slot_type: physical + """ + When we wait "10.0" seconds + Then container "postgresql1" has following replication slots + """ + - slot_name: pgconsul_postgresql3_1_pgconsul_pgconsul_net + slot_type: physical + """ + When we connect to network container "postgresql2" + Then container "postgresql1" has following replication slots + """ + - slot_name: pgconsul_postgresql2_1_pgconsul_pgconsul_net + slot_type: physical + - slot_name: pgconsul_postgresql3_1_pgconsul_pgconsul_net + slot_type: physical + """ + And container "postgresql2" has following replication slots + """ + - slot_name: pgconsul_postgresql3_1_pgconsul_pgconsul_net + slot_type: physical + """ + When we wait "10.0" seconds + Then container "postgresql1" has following replication slots + """ + - slot_name: pgconsul_postgresql2_1_pgconsul_pgconsul_net + slot_type: physical + """ + And container "postgresql2" has following replication slots + """ + - slot_name: pgconsul_postgresql3_1_pgconsul_pgconsul_net + slot_type: physical + """ + Examples: , + | lock_type | lock_host | + | zookeeper | zookeeper1 | diff --git a/tests/features/kill_primary.feature b/tests/features/kill_primary.feature index 27782e2..05d5071 100644 --- a/tests/features/kill_primary.feature +++ b/tests/features/kill_primary.feature @@ -344,7 +344,12 @@ Feature: Destroy primary in various scenarios state: streaming """ When we stop container "postgresql3" - When we drop replication slot "pgconsul_postgresql3_1_pgconsul_pgconsul_net" in container "postgresql1" + And we wait "10.0" seconds + Then container "postgresql1" has following replication slots + """ + - slot_name: pgconsul_postgresql2_1_pgconsul_pgconsul_net + slot_type: physical + """ When we start container "postgresql3" Then "" has value "['pgconsul_postgresql2_1.pgconsul_pgconsul_net']" for key "/pgconsul/postgresql/quorum" When we wait "10.0" seconds diff --git a/tests/steps/database.py b/tests/steps/database.py index 002199b..589a4ad 100644 --- a/tests/steps/database.py +++ b/tests/steps/database.py @@ -8,6 +8,7 @@ import psycopg2 from psycopg2.extras import RealDictCursor +from psycopg2.errors import DuplicateObject import select @@ -111,13 +112,16 @@ def get_config_option(self, option): return deepcopy(self.cursor.fetchone())['opt'] def create_replication_slot(self, slot_name): - self.cursor.execute( - """ - SELECT pg_create_physical_replication_slot(%(name)s) - """, - {'name': slot_name}, - ) - return deepcopy(self.cursor.fetchone()) + try: + self.cursor.execute( + """ + SELECT pg_create_physical_replication_slot(%(name)s) + """, + {'name': slot_name}, + ) + return deepcopy(self.cursor.fetchone()) + except DuplicateObject: + return True def get_replication_slots(self): self.cursor.execute(