Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add taskvine temp/url handling to parsl/taskvine executor #3355

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
17 changes: 12 additions & 5 deletions parsl/executors/taskvine/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ def __init__(self,
manager_config: TaskVineManagerConfig = TaskVineManagerConfig(),
factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(),
provider: Optional[ExecutionProvider] = LocalProvider(init_blocks=1),
working_dir: str = '.',
storage_access: Optional[List[Staging]] = None):

self.working_dir = working_dir

# Set worker launch option for this executor
if worker_launch_method == 'factory' or worker_launch_method == 'manual':
provider = None
Expand Down Expand Up @@ -215,9 +218,9 @@ def __create_data_and_logging_dirs(self):

# Create directories for data and results
log_dir = os.path.join(run_dir, self.label)
self._function_data_dir = os.path.join(run_dir, self.label, "function_data")
self._function_data_dir = os.path.join("/tmp/function_data/", self.label, str(uuid.uuid4()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a change to say "taskvine is going to manage movements of all these files off this local temporary directory"? if so, is that something that works independently of this PR #3355? (eg. if you did it as a different PR, is that what would happen without any other changes?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not in the scope of this PR. I can go ahead and separate the two. This will work independently, and potentially be a benefit to others.

While I was developing this temp file effort I discovered my testing application was writing thousands of function data files to the shared file system, so it got thrown in here for the time being.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, then I think this definitely makes sense to turn into its own PR.

I think you need to consider who owns /tmp/function_data on a shared submit host (it's probably not you, but the first user who ever ran taskvine on this machine) - so maybe this direction should be something like /tmp/parsl-taskvine-{username} so both the guilty user and the guilty software are clearly identified.

os.makedirs(log_dir)
os.makedirs(self._function_data_dir)
os.makedirs(self._function_data_dir, exist_ok=True)

# put TaskVine logs outside of a Parsl run as TaskVine caches between runs while
# Parsl does not.
Expand Down Expand Up @@ -363,6 +366,7 @@ def submit(self, func, resource_specification, *args, **kwargs):

# Also consider any *arg that looks like a file as an input:
input_files += [self._register_file(f) for f in args if isinstance(f, File)]
logger.debug(f"registered input files {input_files}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for debug logs in Parsl, @khk-globus and I have been pushing on a style that defers string interpolation until the log system has decided if it really wants to render that message - often that's not the case and rendering can be expensive. that format looks like this:

logger.debug("Creating process with command '%s'", cmd)

rather than using Python-level string formatting (either f-strings or "".format(


for kwarg, maybe_file in kwargs.items():
# Add appropriate input and output files from "stdout" and "stderr" keyword arguments
Expand Down Expand Up @@ -504,15 +508,18 @@ def _register_file(self, parsl_file):
if parsl_file.scheme == 'file' or \
(parsl_file.local_path and os.path.exists(parsl_file.local_path)):
to_stage = not os.path.isabs(parsl_file.filepath)

return ParslFileToVine(parsl_file.filepath, to_stage, to_cache)
return ParslFileToVine(parsl_file.filepath, parsl_file.netloc, to_stage, to_cache)
else:
# we must stage url and temp files
ptv = ParslFileToVine(parsl_file.url, parsl_file.netloc, True, to_cache)
return ptv

def _std_output_to_vine(self, fdname, stdfspec):
"""Find the name of the file that will contain stdout or stderr and
return a ParslFileToVine with it. These files are never cached"""
fname, mode = putils.get_std_fname_mode(fdname, stdfspec)
to_stage = not os.path.isabs(fname)
return ParslFileToVine(fname, stage=to_stage, cache=False)
return ParslFileToVine(fname, fname, stage=to_stage, cache=False)

def _prepare_package(self, fn, extra_pkgs):
""" Look at source code of apps to figure out their package depedencies
Expand Down
25 changes: 21 additions & 4 deletions parsl/executors/taskvine/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ def _prepare_environment_regular(m, manager_config, t, task, poncho_env_to_file,
t.add_environment(poncho_env_file)


def _handle_file_declaration_protocol(m, filename, cache):
if "taskvinetemp://" in filename:
return m.declare_temp()
elif "https://" in filename or "http://" in filename:
return m.declare_url(filename, cache=cache, peer_transfer=True)
else:
return m.declare_file(filename, cache=cache, peer_transfer=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a bit ugly that this isn't somehow using the parsed URL form of Parsl files to check the namespace, rather than doing ad-hoc basic URL parsing in this if statement... this is switching on the URI scheme only, I think, with the intention that it works like, for example, StubStaging.can_stage_* I think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit redundant compared to what the staging provider does, but there is not a clear pass-through to declare our vine_file objects through the staging provider. I made a change here which brings the protocol/scheme through to this spot where we can switch on that, a little nicer than url string comparisons.



@wrap_with_logs
def _taskvine_submit_wait(ready_task_queue=None,
finished_task_queue=None,
Expand Down Expand Up @@ -364,18 +373,26 @@ def _taskvine_submit_wait(ready_task_queue=None,
if spec.parsl_name in parsl_file_name_to_vine_file:
task_in_file = parsl_file_name_to_vine_file[spec.parsl_name]
else:
task_in_file = m.declare_file(spec.parsl_name, cache=spec.cache, peer_transfer=True)
task_in_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache)
parsl_file_name_to_vine_file[spec.parsl_name] = task_in_file
t.add_input(task_in_file, spec.parsl_name)
logger.debug("Adding input file {}, {} to TaskVine".format(spec.parsl_name, task.executor_id))
if spec.netloc == '':
t.add_input(task_in_file, spec.parsl_name)
else:
t.add_input(task_in_file, spec.netloc)
colinthomas-z80 marked this conversation as resolved.
Show resolved Hide resolved

for spec in task.output_files:
if spec.stage:
if spec.parsl_name in parsl_file_name_to_vine_file:
task_out_file = parsl_file_name_to_vine_file[spec.parsl_name]
else:
task_out_file = m.declare_file(spec.parsl_name, cache=spec.cache, peer_transfer=True)
task_out_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache)
parsl_file_name_to_vine_file[spec.parsl_name] = task_out_file
t.add_output(task_out_file, spec.parsl_name)
logger.debug("Adding output file {}, {} to TaskVine".format(spec.parsl_name, task.executor_id))
if spec.netloc == '':
t.add_output(task_out_file, spec.parsl_name)
else:
t.add_output(task_out_file, spec.netloc)

# Submit the task to the TaskVine object
logger.debug("Submitting executor task {}, {} to TaskVine".format(task.executor_id, t))
Expand Down
37 changes: 37 additions & 0 deletions parsl/executors/taskvine/stub_staging_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging
from concurrent.futures import Future
from typing import Optional

from parsl.app.futures import DataFuture
from parsl.data_provider.files import File
from parsl.data_provider.staging import Staging
from parsl.utils import RepresentationMixin

logger = logging.getLogger(__name__)

known_url_schemes = ["http", "https", "taskvinetemp"]


class StubStaging(Staging, RepresentationMixin):
colinthomas-z80 marked this conversation as resolved.
Show resolved Hide resolved

def can_stage_in(self, file):
logger.debug("Task vine staging provider checking passthrough for {}".format(repr(file)))
return file.scheme in known_url_schemes

def can_stage_out(self, file):
logger.debug("Task vine staging provider checking passthrough for {}".format(repr(file)))
return file.scheme in known_url_schemes

def stage_in(self, dm, executor: str, file: File, parent_fut: Optional[Future]) -> Optional[DataFuture]:
if file.netloc == '':
colinthomas-z80 marked this conversation as resolved.
Show resolved Hide resolved
file.netloc = file.filename
if file.scheme in ["taskvinetemp", "https", "http"]:
file.local_path = file.netloc
return None

def stage_out(self, dm, executor: str, file: File, app_fu: Future) -> Optional[Future]:
if file.netloc == '':
file.netloc = file.filename
if file.scheme in ["taskvinetemp", "https", "http"]:
file.local_path = file.netloc
return None
2 changes: 2 additions & 0 deletions parsl/executors/taskvine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,14 @@ class ParslFileToVine:
"""
def __init__(self,
parsl_name: str, # name of file
netloc: str, # name of file if url
colinthomas-z80 marked this conversation as resolved.
Show resolved Hide resolved
stage: bool, # whether TaskVine should know about this file
cache: bool # whether TaskVine should cache this file
):
self.parsl_name = parsl_name
self.stage = stage
self.cache = cache
self.netloc = netloc


def run_parsl_function(map_file, function_file, argument_file, result_file):
Expand Down
5 changes: 4 additions & 1 deletion parsl/tests/configs/taskvine_ex.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
from parsl.data_provider.file_noop import NoOpFileStaging
from parsl.data_provider.ftp import FTPInTaskStaging
from parsl.data_provider.http import HTTPInTaskStaging
from parsl.data_provider.zip import ZipFileStaging
from parsl.executors.taskvine import TaskVineExecutor, TaskVineManagerConfig
from parsl.executors.taskvine.stub_staging_provider import StubStaging


def fresh_config():
return Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000),
worker_launch_method='factory')])
worker_launch_method='factory',
storage_access=[FTPInTaskStaging(), StubStaging(), NoOpFileStaging(), ZipFileStaging()])])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this drops off the default HTTP provider, so it's testing that HTTP works via this new mechanism not the other Parsl mechanisms. that's good.

Maybe you can drop off NoOpFileStaging too? (NoOpFileStaging really means "use the shared filesystem" staging, and I think taskvine in general is trying to manage those files better than the shared filesystem can?)

or at least, what breaks if you take away NoOpFileStaging here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it would be good to add a taskvine specific local test case that validates that taskvinetemp: URLs work (at least that the file contents get moved around correctly) when used with the TaskVineExecutor - seeing as that's the core functionality of this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely want to keep the capability of using the shared filesystem. Removing NoOp staging fails some basic tests staging out local files. I don't see any reason at the moment to re implement local file handling through the new staging provider.

Loading