Skip to content

Commit

Permalink
⏪ Bring back pathos (#30)
Browse files Browse the repository at this point in the history
* Revert ":sparkles: Introduce maxtasksperchild to increase stability (#29)"

This reverts commit 50de05d.

* ✨ Introduce maxtasksperchild to increase stability
  • Loading branch information
ddelange committed Sep 29, 2022
1 parent 50de05d commit cd77bd8
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 10 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ showcov:
.PHONY: install
## Install this repo, plus dev requirements, in editable mode
install:
pip install -r requirements/ci.txt -r requirements/docs.txt
pip install -e .
pip install -r requirements/ci.txt -r requirements/docs.txt -e .
pre-commit install

.PHONY: builddocs
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

### mapply vs. pandarallel vs. swifter

Where [`pandarallel`](https://github.com/nalepae/pandarallel) only requires [`dill`](https://github.com/uqfoundation/dill) (and therefore has to rely on in-house multiprocessing and progressbars), [`swifter`](https://github.com/jmcarpenter2/swifter) relies on the heavy [`dask`](https://github.com/dask/dask) framework, converting to Dask DataFrames and back. In an attempt to find the golden mean, `mapply` is highly customizable and remains lightweight, leveraging [`tqdm`](https://github.com/tqdm/tqdm) and [`multiprocess`](https://github.com/uqfoundation/multiprocess), which shadows Python's built-in multiprocessing module using [`dill`](https://github.com/uqfoundation/dill) for universal pickling.
Where [`pandarallel`](https://github.com/nalepae/pandarallel) only requires [`dill`](https://github.com/uqfoundation/dill) (and therefore has to rely on in-house multiprocessing and progressbars), [`swifter`](https://github.com/jmcarpenter2/swifter) relies on the heavy [`dask`](https://github.com/dask/dask) framework, converting to Dask DataFrames and back. In an attempt to find the golden mean, `mapply` is highly customizable and remains lightweight, leveraging the powerful [`pathos`](https://github.com/uqfoundation/pathos) framework, which shadows Python's built-in multiprocessing module using `dill` for universal pickling.


## Installation
Expand Down
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,6 @@ def linkcode_resolve( # noqa:CCR001
intersphinx_mapping = {
"python": ("https://docs.python.org/3", None),
"pandas": ("https://pandas.pydata.org/pandas-docs/stable", None),
"multiprocess": ("https://multiprocess.readthedocs.io/en/latest/", None),
"pathos": ("https://pathos.readthedocs.io/en/latest", None),
# "tqdm": ("https://tqdm.github.io/docs/tqdm", None), # mkdocs not working
}
2 changes: 1 addition & 1 deletion requirements/prod.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
multiprocess
pathos>=0.2.0
psutil
tqdm>=4.27 # from tqdm.auto import tqdm
19 changes: 14 additions & 5 deletions src/mapply/parallel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Submodule containing code to distribute computation over multiple processes using :class:`multiprocess.pool.Pool`.
"""Submodule containing code to distribute computation over multiple processes using :class:`pathos.multiprocessing.ProcessPool`.
Standalone usage:
::
Expand All @@ -24,7 +24,7 @@ def some_heavy_computation(x, power):
from typing import Any, Callable, Iterable, Iterator, Optional

import psutil
from multiprocess.pool import Pool
from pathos.multiprocessing import ProcessPool
from tqdm.auto import tqdm as _tqdm

logger = logging.getLogger(__name__)
Expand All @@ -38,7 +38,7 @@ def sensible_cpu_count() -> int:


N_CORES = sensible_cpu_count()
MAX_TASKS_PER_CHILD = os.environ.get("MAPPLY_MAX_TASKS_PER_CHILD", 4)
MAX_TASKS_PER_CHILD = int(os.environ.get("MAPPLY_MAX_TASKS_PER_CHILD", 4))


def _choose_n_workers(n_chunks: Optional[int], n_workers: int) -> int:
Expand Down Expand Up @@ -80,6 +80,10 @@ def multiprocessing_imap(
Yields:
Results in same order as input iterable.
Raises:
Exception: Any error occurred during computation (will terminate the pool early).
KeyboardInterrupt: Any KeyboardInterrupt sent by the user (will terminate the pool early).
"""
n_chunks: Optional[int] = tqdm(iterable, disable=True).__len__() # doesn't exhaust
func = partial(func, *args, **kwargs)
Expand All @@ -92,7 +96,7 @@ def multiprocessing_imap(
stage = map(func, iterable)
else:
logger.debug("Starting ProcessPool with %d workers", n_workers)
pool = Pool(n_workers, maxtasksperchild=MAX_TASKS_PER_CHILD)
pool = ProcessPool(n_workers, maxtasksperchild=MAX_TASKS_PER_CHILD)

stage = pool.imap(func, iterable)

Expand All @@ -101,7 +105,12 @@ def multiprocessing_imap(

try:
yield from stage
finally:
except (Exception, KeyboardInterrupt):
if pool:
logger.debug("Terminating ProcessPool")
pool.terminate()
raise
finally:
if pool:
logger.debug("Closing ProcessPool")
pool.clear()

0 comments on commit cd77bd8

Please sign in to comment.