From b284dc1b068e397ad87fe4154e640af60d364c6d Mon Sep 17 00:00:00 2001 From: Kevin Hunter Kesling Date: Wed, 21 Aug 2024 12:32:32 -0400 Subject: [PATCH 1/6] Non-functional change: minor log call-site updates (#3597) Massage log statements to use argument style consistent with recent practice --- .../executors/high_throughput/interchange.py | 60 ++++++++++--------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index fa0969d398..cd7d0596a9 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -375,7 +375,7 @@ def start(self) -> None: self.zmq_context.destroy() delta = time.time() - start - logger.info("Processed {} tasks in {} seconds".format(self.count, delta)) + logger.info(f"Processed {self.count} tasks in {delta} seconds") logger.warning("Exiting") def process_task_outgoing_incoming( @@ -396,9 +396,8 @@ def process_task_outgoing_incoming( try: msg = json.loads(message[1].decode('utf-8')) except Exception: - logger.warning("Got Exception reading message from manager: {!r}".format( - manager_id), exc_info=True) - logger.debug("Message: \n{!r}\n".format(message[1])) + logger.warning(f"Got Exception reading message from manager: {manager_id!r}", exc_info=True) + logger.debug("Message:\n %r\n", message[1]) return # perform a bit of validation on the structure of the deserialized @@ -406,7 +405,7 @@ def process_task_outgoing_incoming( # in obviously malformed cases if not isinstance(msg, dict) or 'type' not in msg: logger.error(f"JSON message was not correctly formatted from manager: {manager_id!r}") - logger.debug("Message: \n{!r}\n".format(message[1])) + logger.debug("Message:\n %r\n", message[1]) return if msg['type'] == 'registration': @@ -425,7 +424,7 @@ def process_task_outgoing_incoming( self.connected_block_history.append(msg['block_id']) interesting_managers.add(manager_id) - logger.info("Adding manager: {!r} to ready queue".format(manager_id)) + logger.info(f"Adding manager: {manager_id!r} to ready queue") m = self._ready_managers[manager_id] # m is a ManagerRecord, but msg is a dict[Any,Any] and so can @@ -434,12 +433,12 @@ def process_task_outgoing_incoming( # later. m.update(msg) # type: ignore[typeddict-item] - logger.info("Registration info for manager {!r}: {}".format(manager_id, msg)) + logger.info(f"Registration info for manager {manager_id!r}: {msg}") self._send_monitoring_info(monitoring_radio, m) if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or msg['parsl_v'] != self.current_platform['parsl_v']): - logger.error("Manager {!r} has incompatible version info with the interchange".format(manager_id)) + logger.error(f"Manager {manager_id!r} has incompatible version info with the interchange") logger.debug("Setting kill event") kill_event.set() e = VersionMismatch("py.v={} parsl.v={}".format(self.current_platform['python_v'].rsplit(".", 1)[0], @@ -452,16 +451,15 @@ def process_task_outgoing_incoming( self.results_outgoing.send(pkl_package) logger.error("Sent failure reports, shutting down interchange") else: - logger.info("Manager {!r} has compatible Parsl version {}".format(manager_id, msg['parsl_v'])) - logger.info("Manager {!r} has compatible Python version {}".format(manager_id, - msg['python_v'].rsplit(".", 1)[0])) + logger.info(f"Manager {manager_id!r} has compatible Parsl version {msg['parsl_v']}") + logger.info(f"Manager {manager_id!r} has compatible Python version {msg['python_v'].rsplit('.', 1)[0]}") elif msg['type'] == 'heartbeat': self._ready_managers[manager_id]['last_heartbeat'] = time.time() - logger.debug("Manager {!r} sent heartbeat via tasks connection".format(manager_id)) + logger.debug("Manager %r sent heartbeat via tasks connection", manager_id) self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE]) elif msg['type'] == 'drain': self._ready_managers[manager_id]['draining'] = True - logger.debug(f"Manager {manager_id!r} requested drain") + logger.debug("Manager %r requested drain", manager_id) else: logger.error(f"Unexpected message type received from manager: {msg['type']}") logger.debug("leaving task_outgoing section") @@ -484,9 +482,11 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_r def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: # Check if there are tasks that could be sent to managers - logger.debug("Managers count (interesting/total): {interesting}/{total}".format( - total=len(self._ready_managers), - interesting=len(interesting_managers))) + logger.debug( + "Managers count (interesting/total): {}/{}", + len(interesting_managers), + len(self._ready_managers) + ) if interesting_managers and not self.pending_task_queue.empty(): shuffled_managers = self.manager_selector.sort_managers(self._ready_managers, interesting_managers) @@ -497,7 +497,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: tasks_inflight = len(m['tasks']) real_capacity = m['max_capacity'] - tasks_inflight - if (real_capacity and m['active'] and not m['draining']): + if real_capacity and m["active"] and not m["draining"]: tasks = self.get_tasks(real_capacity) if tasks: self.task_outgoing.send_multipart([manager_id, b'', pickle.dumps(tasks)]) @@ -506,19 +506,19 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: tids = [t['task_id'] for t in tasks] m['tasks'].extend(tids) m['idle_since'] = None - logger.debug("Sent tasks: {} to manager {!r}".format(tids, manager_id)) + logger.debug("Sent tasks: %s to manager %r", tids, manager_id) # recompute real_capacity after sending tasks real_capacity = m['max_capacity'] - tasks_inflight if real_capacity > 0: - logger.debug("Manager {!r} has free capacity {}".format(manager_id, real_capacity)) + logger.debug("Manager %r has free capacity %s", manager_id, real_capacity) # ... so keep it in the interesting_managers list else: - logger.debug("Manager {!r} is now saturated".format(manager_id)) + logger.debug("Manager %r is now saturated", manager_id) interesting_managers.remove(manager_id) else: interesting_managers.remove(manager_id) # logger.debug("Nothing to send to manager {}".format(manager_id)) - logger.debug("leaving _ready_managers section, with {} managers still interesting".format(len(interesting_managers))) + logger.debug("leaving _ready_managers section, with %s managers still interesting", len(interesting_managers)) else: logger.debug("either no interesting managers or no tasks, so skipping manager pass") @@ -528,9 +528,9 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ logger.debug("entering results_incoming section") manager_id, *all_messages = self.results_incoming.recv_multipart() if manager_id not in self._ready_managers: - logger.warning("Received a result from a un-registered manager: {!r}".format(manager_id)) + logger.warning(f"Received a result from a un-registered manager: {manager_id!r}") else: - logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id!r}") + logger.debug("Got %s result items in batch from manager %r", len(all_messages), manager_id) b_messages = [] @@ -548,10 +548,10 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ monitoring_radio.send(r['payload']) elif r['type'] == 'heartbeat': - logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection") + logger.debug("Manager %r sent heartbeat via results connection", manager_id) b_messages.append((p_message, r)) else: - logger.error("Interchange discarding result_queue message of unknown type: {}".format(r['type'])) + logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"]) got_result = False m = self._ready_managers[manager_id] @@ -560,14 +560,16 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ if r['type'] == 'result': got_result = True try: - logger.debug(f"Removing task {r['task_id']} from manager record {manager_id!r}") + logger.debug("Removing task %s from manager record %r", r["task_id"], manager_id) m['tasks'].remove(r['task_id']) except Exception: # If we reach here, there's something very wrong. - logger.exception("Ignoring exception removing task_id {} for manager {!r} with task list {}".format( + logger.exception( + "Ignoring exception removing task_id %s for manager %r with task list %s", r['task_id'], manager_id, - m['tasks'])) + m["tasks"] + ) b_messages_to_send = [] for (b_message, _) in b_messages: @@ -578,7 +580,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ self.results_outgoing.send_multipart(b_messages_to_send) logger.debug("Sent messages on results_outgoing") - logger.debug(f"Current tasks on manager {manager_id!r}: {m['tasks']}") + logger.debug("Current tasks on manager %r: %s", manager_id, m["tasks"]) if len(m['tasks']) == 0 and m['idle_since'] is None: m['idle_since'] = time.time() From 789ee82606b4a625bfa4c8d2c1870f7e09456821 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 22 Aug 2024 17:21:33 +0200 Subject: [PATCH 2/6] Fix incorrect string template introduced in PR #3597 (#3599) --- parsl/executors/high_throughput/interchange.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index cd7d0596a9..d61c76fed2 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -483,7 +483,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: # Check if there are tasks that could be sent to managers logger.debug( - "Managers count (interesting/total): {}/{}", + "Managers count (interesting/total): %d/%d", len(interesting_managers), len(self._ready_managers) ) From 4ea3fbc1ef5277bf50bd8898e3fad680d5aaa4ae Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 26 Aug 2024 17:10:05 +0200 Subject: [PATCH 3/6] Make deliberately scaled-in unstarted blocks not be failures (#3594) This PR adds a new terminal job state, SCALED_IN. None of the existing providers will return it, but the scaling layer will use it to mark a job as deliberately scaled in, so that error handling code will not regard it as failed. Fixes #3568 --- parsl/executors/high_throughput/executor.py | 3 +- parsl/executors/status_handling.py | 7 +- parsl/jobs/states.py | 7 +- .../test_multiple_disconnected_blocks.py | 8 +- .../test_htex_init_blocks_vs_monitoring.py | 2 +- ...st_regression_3568_scaledown_vs_MISSING.py | 85 +++++++++++++++++++ 6 files changed, 102 insertions(+), 10 deletions(-) create mode 100644 parsl/tests/test_scaling/test_regression_3568_scaledown_vs_MISSING.py diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index c4097500f1..0e0ea9c892 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -790,7 +790,8 @@ def status(self) -> Dict[str, JobStatus]: connected_blocks = self.connected_blocks() for job_id in job_status: job_info = job_status[job_id] - if job_info.terminal and job_id not in connected_blocks: + if job_info.terminal and job_id not in connected_blocks and job_info.state != JobState.SCALED_IN: + logger.debug("Rewriting job %s from status %s to MISSING", job_id, job_info) job_status[job_id].state = JobState.MISSING if job_status[job_id].message is None: job_status[job_id].message = ( diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 34db2300f6..615f09de78 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -347,7 +347,10 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[ if block_ids is not None: new_status = {} for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.CANCELLED) - del self._status[block_id] + logger.debug("Marking block %s as SCALED_IN", block_id) + s = JobStatus(JobState.SCALED_IN) + new_status[block_id] = s + self._status[block_id] = s + self._simulated_status[block_id] = s self.send_monitoring_info(new_status) return block_ids diff --git a/parsl/jobs/states.py b/parsl/jobs/states.py index 7ba4aae94e..792a515bac 100644 --- a/parsl/jobs/states.py +++ b/parsl/jobs/states.py @@ -46,12 +46,17 @@ class JobState(IntEnum): bad worker environment or network connectivity issues. """ + SCALED_IN = 9 + """This job has been deliberately scaled in. Scaling code should not be concerned + that the job never ran (for example for error handling purposes). + """ + def __str__(self) -> str: return f"{self.__class__.__name__}.{self.name}" TERMINAL_STATES = [JobState.CANCELLED, JobState.COMPLETED, JobState.FAILED, - JobState.TIMEOUT, JobState.MISSING] + JobState.TIMEOUT, JobState.MISSING, JobState.SCALED_IN] class JobStatus: diff --git a/parsl/tests/test_htex/test_multiple_disconnected_blocks.py b/parsl/tests/test_htex/test_multiple_disconnected_blocks.py index 159c20f58d..4168f41b79 100644 --- a/parsl/tests/test_htex/test_multiple_disconnected_blocks.py +++ b/parsl/tests/test_htex/test_multiple_disconnected_blocks.py @@ -21,16 +21,14 @@ def local_config(): poll_period=100, max_workers_per_node=1, provider=LocalProvider( - worker_init="conda deactivate; export PATH=''; which python; exit 0", - init_blocks=2, - max_blocks=4, - min_blocks=0, + worker_init="exit 0", + init_blocks=2 ), ) ], run_dir="/tmp/test_htex", max_idletime=0.5, - strategy='htex_auto_scale', + strategy='none', ) diff --git a/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py b/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py index eb7a25003b..ada972e747 100644 --- a/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py +++ b/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py @@ -78,6 +78,6 @@ def test_row_counts(tmpd_cwd, strategy): (c, ) = result.first() assert c == 1, "There should be a single pending status" - result = connection.execute(text("SELECT COUNT(*) FROM block WHERE block_id = 0 AND status = 'CANCELLED' AND run_id = :run_id"), binds) + result = connection.execute(text("SELECT COUNT(*) FROM block WHERE block_id = 0 AND status = 'SCALED_IN' AND run_id = :run_id"), binds) (c, ) = result.first() assert c == 1, "There should be a single cancelled status" diff --git a/parsl/tests/test_scaling/test_regression_3568_scaledown_vs_MISSING.py b/parsl/tests/test_scaling/test_regression_3568_scaledown_vs_MISSING.py new file mode 100644 index 0000000000..a56b53af10 --- /dev/null +++ b/parsl/tests/test_scaling/test_regression_3568_scaledown_vs_MISSING.py @@ -0,0 +1,85 @@ +import time + +import pytest + +import parsl +from parsl.channels import LocalChannel +from parsl.config import Config +from parsl.executors import HighThroughputExecutor +from parsl.launchers import WrappedLauncher +from parsl.providers import LocalProvider + + +def local_config(): + # see the comments inside test_regression for reasoning about why each + # of these parameters is set why it is. + return Config( + max_idletime=1, + + strategy='htex_auto_scale', + strategy_period=1, + + executors=[ + HighThroughputExecutor( + label="htex_local", + encrypted=True, + provider=LocalProvider( + init_blocks=1, + min_blocks=0, + max_blocks=1, + launcher=WrappedLauncher(prepend="sleep inf ; "), + ), + ) + ], + ) + + +@parsl.python_app +def task(): + return 7 + + +@pytest.mark.local +def test_regression(try_assert): + # The above config means that we should start scaling out one initial + # block, but then scale it back in after a second or so if the executor + # is kept idle (which this test does using try_assert). + + # Because of 'sleep inf' in the WrappedLaucher, the block will not ever + # register. + + # The bug being tested is about mistreatment of blocks which are scaled in + # before they have a chance to register, and the above forces that to + # happen. + + # After that scaling in has happened, we should see that we have one block + # and it should be in a terminal state. The below try_assert waits for + # that to become true. + + # At that time, we should also see htex reporting no blocks registered - as + # mentioned above, that is a necessary part of the bug being tested here. + + # Give 10 strategy periods for the above to happen: each step of scale up, + # and scale down due to idleness isn't guaranteed to happen in exactly one + # scaling step. + + htex = parsl.dfk().executors['htex_local'] + + try_assert(lambda: len(htex.status_facade) == 1 and htex.status_facade['0'].terminal, + timeout_ms=10000) + + assert htex.connected_blocks() == [], "No block should have connected to interchange" + + # Now we can reconfigure the launcher to let subsequent blocks launch ok, + # and run a trivial task. That trivial task will scale up a new block and + # run the task successfully. + + # Prior to issue #3568, the bug was that the scale in of the first + # block earlier in the test case would have incorrectly been treated as a + # failure, and then the block error handler would have treated that failure + # as a permanent htex failure, and so the task execution below would raise + # a BadStateException rather than attempt to run the task. + + assert htex.provider.launcher.prepend != "", "Pre-req: prepend attribute should exist and be non-empty" + htex.provider.launcher.prepend = "" + assert task().result() == 7 From 0a198a3abfd9e8780563d073ac618bc3e6dfc643 Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Tue, 27 Aug 2024 13:08:02 -0500 Subject: [PATCH 4/6] Add breadcrumbs for users trying to match configs to their machines (#3603) Both Polaris and Perlmutter have configuration documentation hosted by their respective facilities. This is great, but it makes it harder for users trying to match an undocumented machine to existing configs. This PR adds some hints to both Polaris and Perlmutter listing the provider and launcher combo used in the hopes that it might lead the user to configs from those machines. --- docs/userguide/configuring.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/userguide/configuring.rst b/docs/userguide/configuring.rst index f3fe5cc407..3933695eb3 100644 --- a/docs/userguide/configuring.rst +++ b/docs/userguide/configuring.rst @@ -542,6 +542,9 @@ Perlmutter (NERSC) ------------------ NERSC provides documentation on `how to use Parsl on Perlmutter `_. +Perlmutter is a Slurm based HPC system and parsl uses `parsl.providers.SlurmProvider` with `parsl.launchers.SrunLauncher` +to launch tasks onto this machine. + Frontera (TACC) --------------- @@ -599,6 +602,8 @@ Polaris (ALCF) :width: 75% ALCF provides documentation on `how to use Parsl on Polaris `_. +Polaris uses `parsl.providers.PBSProProvider` and `parsl.launchers.MpiExecLauncher` to launch tasks onto the HPC system. + Stampede2 (TACC) From 3f2bf1865eea16cc44d6b7f8938a1ae1781c61fd Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Wed, 28 Aug 2024 11:28:54 -0500 Subject: [PATCH 5/6] Adding configuration example for Improv@LCRC (#3602) --- docs/reference.rst | 1 + docs/userguide/configuring.rst | 12 ++++++++++++ parsl/configs/improv.py | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 47 insertions(+) create mode 100644 parsl/configs/improv.py diff --git a/docs/reference.rst b/docs/reference.rst index d8e18bd244..f2d89afaf8 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -93,6 +93,7 @@ Launchers parsl.launchers.SrunMPILauncher parsl.launchers.GnuParallelLauncher parsl.launchers.MpiExecLauncher + parsl.launchers.MpiRunLauncher parsl.launchers.JsrunLauncher parsl.launchers.WrappedLauncher diff --git a/docs/userguide/configuring.rst b/docs/userguide/configuring.rst index 3933695eb3..a57e815fe7 100644 --- a/docs/userguide/configuring.rst +++ b/docs/userguide/configuring.rst @@ -536,6 +536,18 @@ Center's **Expanse** supercomputer. The example is designed to be executed on th .. literalinclude:: ../../parsl/configs/expanse.py +Improv (Argonne LCRC) +--------------------- + +.. image:: https://www.lcrc.anl.gov/sites/default/files/styles/965_wide/public/2023-12/20231214_114057.jpg?itok=A-Rz5pP9 + +**Improv** is a PBS Pro based supercomputer at Argonne's Laboratory Computing Resource +Center (LCRC). The following snippet is an example configuration that uses `parsl.providers.PBSProProvider` +and `parsl.launchers.MpiRunLauncher` to run on multinode jobs. + +.. literalinclude:: ../../parsl/configs/improv.py + + .. _configuring_nersc_cori: Perlmutter (NERSC) diff --git a/parsl/configs/improv.py b/parsl/configs/improv.py new file mode 100644 index 0000000000..8a40282829 --- /dev/null +++ b/parsl/configs/improv.py @@ -0,0 +1,34 @@ +from parsl.config import Config +from parsl.executors import HighThroughputExecutor +from parsl.launchers import MpiRunLauncher +from parsl.providers import PBSProProvider + +config = Config( + executors=[ + HighThroughputExecutor( + label="Improv_multinode", + max_workers_per_node=32, + provider=PBSProProvider( + account="YOUR_ALLOCATION_ON_IMPROV", + # PBS directives (header lines), for example: + # scheduler_options='#PBS -l mem=4gb', + scheduler_options='', + + queue="compute", + + # Command to be run before starting a worker: + # **WARNING** Improv requires an openmpi module to be + # loaded for the MpiRunLauncher. Add additional env + # load commands to this multiline string. + worker_init=''' +module load gcc/13.2.0; +module load openmpi/5.0.3-gcc-13.2.0; ''', + launcher=MpiRunLauncher(), + + # number of compute nodes allocated for each block + nodes_per_block=2, + walltime='00:10:00' + ), + ), + ], +) From 3a256deff0547da8e9338e5e52017033b14480cc Mon Sep 17 00:00:00 2001 From: Kevin Hunter Kesling Date: Wed, 4 Sep 2024 11:50:54 -0400 Subject: [PATCH 6/6] Remove deprecated max_worker (#3605) Any workflows that still use max_worker will no longer simply get a warning message but still work. Users will need to change any scripts to use max_workers_per_node. (But they've had six months of warnings to do the deed!) --- parsl/executors/high_throughput/executor.py | 18 +----------- parsl/tests/site_tests/README.rst | 29 ++++++++++--------- parsl/tests/test_htex/test_htex.py | 12 -------- parsl/tests/test_mpi_apps/test_mpiex.py | 2 +- .../test_scale_down_htex_auto_scale.py | 2 +- .../test_scale_down_htex_unregistered.py | 2 +- 6 files changed, 19 insertions(+), 46 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 0e0ea9c892..e589975fb5 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -199,9 +199,6 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn will check the available memory at startup and limit the number of workers such that the there's sufficient memory for each worker. Default: None - max_workers : int - Deprecated. Please use max_workers_per_node instead. - max_workers_per_node : int Caps the number of workers launched per node. Default: None @@ -239,7 +236,6 @@ def __init__(self, worker_debug: bool = False, cores_per_worker: float = 1.0, mem_per_worker: Optional[float] = None, - max_workers: Optional[Union[int, float]] = None, max_workers_per_node: Optional[Union[int, float]] = None, cpu_affinity: str = 'none', available_accelerators: Union[int, Sequence[str]] = (), @@ -272,9 +268,7 @@ def __init__(self, else: self.all_addresses = ','.join(get_all_addresses()) - if max_workers: - self._warn_deprecated("max_workers", "max_workers_per_node") - self.max_workers_per_node = max_workers_per_node or max_workers or float("inf") + self.max_workers_per_node = max_workers_per_node or float("inf") mem_slots = self.max_workers_per_node cpu_slots = self.max_workers_per_node @@ -335,16 +329,6 @@ def _warn_deprecated(self, old: str, new: str): stacklevel=2 ) - @property - def max_workers(self): - self._warn_deprecated("max_workers", "max_workers_per_node") - return self.max_workers_per_node - - @max_workers.setter - def max_workers(self, val: Union[int, float]): - self._warn_deprecated("max_workers", "max_workers_per_node") - self.max_workers_per_node = val - @property def logdir(self): return "{}/{}".format(self.run_dir, self.label) diff --git a/parsl/tests/site_tests/README.rst b/parsl/tests/site_tests/README.rst index 80a9d9f3bc..85420c3b4c 100644 --- a/parsl/tests/site_tests/README.rst +++ b/parsl/tests/site_tests/README.rst @@ -46,27 +46,28 @@ Adding a new site 1. Specialized python builds for the system (for eg, Summit) 2. Anaconda available via modules 3. User's conda installation -* Add a new block to `conda_setup.sh` that installs a fresh environment and writes out - the activation commands to `~/setup_parsl_test_env.sh` -* Add a site config to `parsl/tests/configs/` and add your local user options - to `parsl/tests/configs/local_user_opts.py`. For eg, `here's mine`_ - Make sure that the site config uses the `fresh_config` pattern. +* Add a new block to ``conda_setup.sh`` that installs a fresh environment and writes out + the activation commands to ``~/setup_parsl_test_env.sh`` +* Add a site config to ``parsl/tests/configs/`` and add your local user options + to ``parsl/tests/configs/local_user_opts.py``. For example, + `here's mine`_ + Make sure that the site config uses the ``fresh_config`` pattern. Please ensure that the site config uses: - * max_workers = 1 - * init_blocks = 1 - * min_blocks = 0 + * ``max_workers_per_node = 1`` + * ``init_blocks = 1`` + * ``min_blocks = 0`` -* Add this site config to `parsl/tests/site_tests/site_config_selector.py` -* Reinstall parsl, using `pip install .` -* Test a single test: `python3 test_site.py -d` to confirm that the site works correctly. -* Once tests are passing run the whole site_test with `make site_test` +* Add this site config to ``parsl/tests/site_tests/site_config_selector.py`` +* Reinstall parsl, using ``pip install .`` +* Test a single test: ``python3 test_site.py -d`` to confirm that the site works correctly. +* Once tests are passing run the whole site_test with ``make site_test`` Shared filesystem option ------------------------ -There is a new env variable "SHARED_FS_OPTIONS" to pass markers to pytest to skip certain tests. +There is a new env variable ``SHARED_FS_OPTIONS`` to pass markers to pytest to skip certain tests. When there's a shared-FS, the default NoOpStaging works. However, when there's no shared-FS some tests that uses File objects require a staging provider (eg. rsync). These tests can be turned off with -`-k "not staging_required"` +``-k "not staging_required"`` diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index 810236c1b4..7781dc7bec 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -126,18 +126,6 @@ def kill_interchange(*args, **kwargs): assert "HighThroughputExecutor has not started" in caplog.text -@pytest.mark.local -def test_max_workers_per_node(): - with pytest.warns(DeprecationWarning) as record: - htex = HighThroughputExecutor(max_workers_per_node=1, max_workers=2) - - warning_msg = "max_workers is deprecated" - assert any(warning_msg in str(warning.message) for warning in record) - - # Ensure max_workers_per_node takes precedence - assert htex.max_workers_per_node == htex.max_workers == 1 - - @pytest.mark.local @pytest.mark.parametrize("cmd", (None, "custom-launch-cmd")) def test_htex_worker_pool_launch_cmd(cmd: Optional[str]): diff --git a/parsl/tests/test_mpi_apps/test_mpiex.py b/parsl/tests/test_mpi_apps/test_mpiex.py index 2e8a38bc68..e52eccd990 100644 --- a/parsl/tests/test_mpi_apps/test_mpiex.py +++ b/parsl/tests/test_mpi_apps/test_mpiex.py @@ -43,7 +43,7 @@ def test_init(): new_kwargs = {'max_workers_per_block', 'mpi_launcher'} excluded_kwargs = {'available_accelerators', 'cores_per_worker', 'max_workers_per_node', - 'mem_per_worker', 'cpu_affinity', 'max_workers', 'manager_selector'} + 'mem_per_worker', 'cpu_affinity', 'manager_selector'} # Get the kwargs from both HTEx and MPIEx htex_kwargs = set(signature(HighThroughputExecutor.__init__).parameters) diff --git a/parsl/tests/test_scaling/test_scale_down_htex_auto_scale.py b/parsl/tests/test_scaling/test_scale_down_htex_auto_scale.py index 424caac452..016a51dc48 100644 --- a/parsl/tests/test_scaling/test_scale_down_htex_auto_scale.py +++ b/parsl/tests/test_scaling/test_scale_down_htex_auto_scale.py @@ -23,7 +23,7 @@ def local_config(): poll_period=100, label="htex_local", address="127.0.0.1", - max_workers=1, + max_workers_per_node=1, encrypted=True, provider=LocalProvider( channel=LocalChannel(), diff --git a/parsl/tests/test_scaling/test_scale_down_htex_unregistered.py b/parsl/tests/test_scaling/test_scale_down_htex_unregistered.py index dd1b164478..529877eac7 100644 --- a/parsl/tests/test_scaling/test_scale_down_htex_unregistered.py +++ b/parsl/tests/test_scaling/test_scale_down_htex_unregistered.py @@ -27,7 +27,7 @@ def local_config(): poll_period=100, label="htex_local", address="127.0.0.1", - max_workers=1, + max_workers_per_node=1, encrypted=True, launch_cmd="sleep inf", provider=LocalProvider(