From b3240ee4255cc297db6959908e276dc942525ee7 Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Sun, 7 Apr 2024 08:54:39 -0400 Subject: [PATCH 01/14] taskvine temp experiment --- parsl/executors/taskvine/manager.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index e5a4986062..24182783e3 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -178,6 +178,8 @@ def _taskvine_submit_wait(ready_task_queue=None, # dict[str] -> vine File object parsl_file_name_to_vine_file = {} + parsl_file_possible_temp = {} + # Mapping of tasks from vine id to parsl id # Dict[str] -> str vine_id_to_executor_task_id = {} @@ -358,6 +360,8 @@ def _taskvine_submit_wait(ready_task_queue=None, # not staged by taskvine. # Files that share the same local path are assumed to be the same # and thus use the same Vine File object if detected. + temps = [] + if not manager_config.shared_fs: for spec in task.input_files: if spec.stage: @@ -366,14 +370,20 @@ def _taskvine_submit_wait(ready_task_queue=None, else: task_in_file = m.declare_file(spec.parsl_name, cache=spec.cache, peer_transfer=True) parsl_file_name_to_vine_file[spec.parsl_name] = task_in_file - t.add_input(task_in_file, spec.parsl_name) + if task_in_file in temps: + logger.debug("Adding strict input temp to TaskVine {}".format(task_in_file)) + t.add_input(task_in_file, spec.parsl_name, strict_input=True) + else: + t.add_input(task_in_file, spec.parsl_name) for spec in task.output_files: if spec.stage: if spec.parsl_name in parsl_file_name_to_vine_file: task_out_file = parsl_file_name_to_vine_file[spec.parsl_name] else: - task_out_file = m.declare_file(spec.parsl_name, cache=spec.cache, peer_transfer=True) + task_out_file = m.declare_temp() + #temps.append(task_out_file) +# task_out_file = m.declare_file(spec.parsl_name, cache=spec.cache, peer_transfer=True) parsl_file_name_to_vine_file[spec.parsl_name] = task_out_file t.add_output(task_out_file, spec.parsl_name) From 969cd275b9918c9b2f4e087508d2aac5cfd1b56e Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Thu, 11 Apr 2024 20:37:20 -0400 Subject: [PATCH 02/14] taskvine url and temps --- parsl/executors/taskvine/executor.py | 16 ++++++++-- parsl/executors/taskvine/manager.py | 27 ++++++++++------- .../taskvine/stub_staging_provider.py | 29 +++++++++++++++++++ 3 files changed, 58 insertions(+), 14 deletions(-) create mode 100644 parsl/executors/taskvine/stub_staging_provider.py diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 6cfedf92bb..f36208a9c0 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -105,8 +105,11 @@ def __init__(self, manager_config: TaskVineManagerConfig = TaskVineManagerConfig(), factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(), provider: Optional[ExecutionProvider] = LocalProvider(init_blocks=1), + working_dir = '.', storage_access: Optional[List[Staging]] = None): + self.working_dir = working_dir + # Set worker launch option for this executor if worker_launch_method == 'factory' or worker_launch_method == 'manual': provider = None @@ -213,9 +216,11 @@ def __create_data_and_logging_dirs(self): # Use the current run directory from Parsl run_dir = self.run_dir + run_idx = run_dir.split("/")[-1] + # Create directories for data and results log_dir = os.path.join(run_dir, self.label) - self._function_data_dir = os.path.join(run_dir, self.label, "function_data") + self._function_data_dir = os.path.join(f"/tmp/function_data/", self.label, run_idx) os.makedirs(log_dir) os.makedirs(self._function_data_dir) @@ -363,6 +368,7 @@ def submit(self, func, resource_specification, *args, **kwargs): # Also consider any *arg that looks like a file as an input: input_files += [self._register_file(f) for f in args if isinstance(f, File)] + logger.debug(f"registered input files {input_files}") for kwarg, maybe_file in kwargs.items(): # Add appropriate input and output files from "stdout" and "stderr" keyword arguments @@ -504,8 +510,12 @@ def _register_file(self, parsl_file): if parsl_file.scheme == 'file' or \ (parsl_file.local_path and os.path.exists(parsl_file.local_path)): to_stage = not os.path.isabs(parsl_file.filepath) - - return ParslFileToVine(parsl_file.filepath, to_stage, to_cache) + return ParslFileToVine(parsl_file.filepath, to_stage, to_cache) + else: + parsl_file.local_path = parsl_file.url + parsl_file.filename = uuid.uuid4() + logger.debug(f"setting local path {parsl_file.local_path} to filename {parsl_file.filename}") + return ParslFileToVine(parsl_file.filepath, True, to_cache) def _std_output_to_vine(self, fdname, stdfspec): """Find the name of the file that will contain stdout or stderr and diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index 24182783e3..2489f9bee0 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -127,6 +127,14 @@ def _prepare_environment_regular(m, manager_config, t, task, poncho_env_to_file, if poncho_env_file is not None: t.add_environment(poncho_env_file) +def _handle_file_declaration_protocol(m, filename, cache): + if "taskvinetemp://" in filename: + return m.declare_temp() + elif "https://" in filename or "http://" in filename: + return m.declare_url(filename, cache=cache, peer_transfer=True) + else: + return m.declare_file(filename, cache=cache, peer_transfer=True) + @wrap_with_logs def _taskvine_submit_wait(ready_task_queue=None, @@ -360,7 +368,6 @@ def _taskvine_submit_wait(ready_task_queue=None, # not staged by taskvine. # Files that share the same local path are assumed to be the same # and thus use the same Vine File object if detected. - temps = [] if not manager_config.shared_fs: for spec in task.input_files: @@ -368,24 +375,22 @@ def _taskvine_submit_wait(ready_task_queue=None, if spec.parsl_name in parsl_file_name_to_vine_file: task_in_file = parsl_file_name_to_vine_file[spec.parsl_name] else: - task_in_file = m.declare_file(spec.parsl_name, cache=spec.cache, peer_transfer=True) + task_in_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache) parsl_file_name_to_vine_file[spec.parsl_name] = task_in_file - if task_in_file in temps: - logger.debug("Adding strict input temp to TaskVine {}".format(task_in_file)) - t.add_input(task_in_file, spec.parsl_name, strict_input=True) - else: - t.add_input(task_in_file, spec.parsl_name) + logger.debug("Adding input file {}, {} to TaskVine".format(task_in_file, task.executor_id)) +# split_name = spec.parsl_name.split("/")[-1] + t.add_input(task_in_file, spec.parsl_name) for spec in task.output_files: if spec.stage: if spec.parsl_name in parsl_file_name_to_vine_file: task_out_file = parsl_file_name_to_vine_file[spec.parsl_name] else: - task_out_file = m.declare_temp() - #temps.append(task_out_file) -# task_out_file = m.declare_file(spec.parsl_name, cache=spec.cache, peer_transfer=True) + task_out_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache) parsl_file_name_to_vine_file[spec.parsl_name] = task_out_file - t.add_output(task_out_file, spec.parsl_name) + logger.debug("Adding output file {}, {} to TaskVine".format(task_out_file, task.executor_id)) + split_name = spec.parsl_name.split("/")[-1] + t.add_output(task_out_file, split_name) # Submit the task to the TaskVine object logger.debug("Submitting executor task {}, {} to TaskVine".format(task.executor_id, t)) diff --git a/parsl/executors/taskvine/stub_staging_provider.py b/parsl/executors/taskvine/stub_staging_provider.py new file mode 100644 index 0000000000..0cfd09efa6 --- /dev/null +++ b/parsl/executors/taskvine/stub_staging_provider.py @@ -0,0 +1,29 @@ +import logging + +from parsl.utils import RepresentationMixin +from parsl.data_provider.staging import Staging +from parsl.data_provider.files import File +from concurrent.futures import Future +from parsl.app.futures import DataFuture +from typing import Optional, Callable +import os + +logger = logging.getLogger(__name__) + +known_url_schemes = ["file", "http", "https", "taskvinetemp"] + +class StubStaging(Staging, RepresentationMixin): + + def can_stage_in(self, file): + logger.debug("Task vine staging provider checking passthrough for {}".format(repr(file))) + return file.scheme in known_url_schemes + + def can_stage_out(self, file): + logger.debug("Task vine staging provider checking passthrough for {}".format(repr(file))) + return file.scheme in known_url_schemes + + def stage_in(self, dm: "DataManager", executor: str, file: File, parent_fut: Optional[Future]) -> Optional[DataFuture]: + return None + + def stage_out(self, dm: "DataManager", executor: str, file: File, app_fu: Future) -> Optional[Future]: + return None From 3e5c05dd04bada379e69babdb50b731a66284e36 Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Fri, 12 Apr 2024 17:28:42 -0400 Subject: [PATCH 03/14] working on names --- parsl/executors/taskvine/executor.py | 11 +++++------ parsl/executors/taskvine/manager.py | 4 ++-- parsl/executors/taskvine/stub_staging_provider.py | 2 ++ parsl/executors/taskvine/utils.py | 3 +++ 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index f36208a9c0..7ff980c328 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -510,19 +510,18 @@ def _register_file(self, parsl_file): if parsl_file.scheme == 'file' or \ (parsl_file.local_path and os.path.exists(parsl_file.local_path)): to_stage = not os.path.isabs(parsl_file.filepath) - return ParslFileToVine(parsl_file.filepath, to_stage, to_cache) + return ParslFileToVine(parsl_file.filepath, parsl_file.netloc, to_stage, to_cache) else: - parsl_file.local_path = parsl_file.url - parsl_file.filename = uuid.uuid4() - logger.debug(f"setting local path {parsl_file.local_path} to filename {parsl_file.filename}") - return ParslFileToVine(parsl_file.filepath, True, to_cache) + # we must stage url and temp files + ptv = ParslFileToVine(parsl_file.url, parsl_file.netloc, True, to_cache) + return ptv def _std_output_to_vine(self, fdname, stdfspec): """Find the name of the file that will contain stdout or stderr and return a ParslFileToVine with it. These files are never cached""" fname, mode = putils.get_std_fname_mode(fdname, stdfspec) to_stage = not os.path.isabs(fname) - return ParslFileToVine(fname, stage=to_stage, cache=False) + return ParslFileToVine(fname, fname, stage=to_stage, cache=False) def _prepare_package(self, fn, extra_pkgs): """ Look at source code of apps to figure out their package depedencies diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index 2489f9bee0..b118c20766 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -379,7 +379,7 @@ def _taskvine_submit_wait(ready_task_queue=None, parsl_file_name_to_vine_file[spec.parsl_name] = task_in_file logger.debug("Adding input file {}, {} to TaskVine".format(task_in_file, task.executor_id)) # split_name = spec.parsl_name.split("/")[-1] - t.add_input(task_in_file, spec.parsl_name) + t.add_input(task_in_file, spec.netloc) for spec in task.output_files: if spec.stage: @@ -390,7 +390,7 @@ def _taskvine_submit_wait(ready_task_queue=None, parsl_file_name_to_vine_file[spec.parsl_name] = task_out_file logger.debug("Adding output file {}, {} to TaskVine".format(task_out_file, task.executor_id)) split_name = spec.parsl_name.split("/")[-1] - t.add_output(task_out_file, split_name) + t.add_output(task_out_file, spec.netloc) # Submit the task to the TaskVine object logger.debug("Submitting executor task {}, {} to TaskVine".format(task.executor_id, t)) diff --git a/parsl/executors/taskvine/stub_staging_provider.py b/parsl/executors/taskvine/stub_staging_provider.py index 0cfd09efa6..2984196b51 100644 --- a/parsl/executors/taskvine/stub_staging_provider.py +++ b/parsl/executors/taskvine/stub_staging_provider.py @@ -23,7 +23,9 @@ def can_stage_out(self, file): return file.scheme in known_url_schemes def stage_in(self, dm: "DataManager", executor: str, file: File, parent_fut: Optional[Future]) -> Optional[DataFuture]: + file.local_path = file.netloc return None def stage_out(self, dm: "DataManager", executor: str, file: File, app_fu: Future) -> Optional[Future]: + file.local_path = file.netloc return None diff --git a/parsl/executors/taskvine/utils.py b/parsl/executors/taskvine/utils.py index 86cf446b1a..af79ebe1e6 100644 --- a/parsl/executors/taskvine/utils.py +++ b/parsl/executors/taskvine/utils.py @@ -1,4 +1,5 @@ from typing import Optional +import uuid class ParslTaskToVine: @@ -69,12 +70,14 @@ class ParslFileToVine: """ def __init__(self, parsl_name: str, # name of file + netloc: str, # name of file if url stage: bool, # whether TaskVine should know about this file cache: bool # whether TaskVine should cache this file ): self.parsl_name = parsl_name self.stage = stage self.cache = cache + self.netloc = netloc def run_parsl_function(map_file, function_file, argument_file, result_file): From 8aa948591efb2c7b0fb93d1ae4674fd0147e440a Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Sat, 13 Apr 2024 14:08:33 -0400 Subject: [PATCH 04/14] work with regular files --- parsl/executors/taskvine/stub_staging_provider.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/parsl/executors/taskvine/stub_staging_provider.py b/parsl/executors/taskvine/stub_staging_provider.py index 2984196b51..9e16bc8e8b 100644 --- a/parsl/executors/taskvine/stub_staging_provider.py +++ b/parsl/executors/taskvine/stub_staging_provider.py @@ -23,9 +23,13 @@ def can_stage_out(self, file): return file.scheme in known_url_schemes def stage_in(self, dm: "DataManager", executor: str, file: File, parent_fut: Optional[Future]) -> Optional[DataFuture]: + if file.netloc == '': + file.netloc = file.filename file.local_path = file.netloc return None def stage_out(self, dm: "DataManager", executor: str, file: File, app_fu: Future) -> Optional[Future]: + if file.netloc == '': + file.netloc = file.filename file.local_path = file.netloc return None From 8c91bfdc4685a0ae0fcf46e435a976a2fe5b7435 Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Sun, 14 Apr 2024 16:40:35 -0400 Subject: [PATCH 05/14] formatting --- parsl/executors/taskvine/executor.py | 4 ++-- parsl/executors/taskvine/manager.py | 8 ++------ parsl/executors/taskvine/stub_staging_provider.py | 10 +++++----- parsl/executors/taskvine/utils.py | 1 - 4 files changed, 9 insertions(+), 14 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 7ff980c328..6f173f3e9b 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -105,7 +105,7 @@ def __init__(self, manager_config: TaskVineManagerConfig = TaskVineManagerConfig(), factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(), provider: Optional[ExecutionProvider] = LocalProvider(init_blocks=1), - working_dir = '.', + working_dir: str = '.', storage_access: Optional[List[Staging]] = None): self.working_dir = working_dir @@ -220,7 +220,7 @@ def __create_data_and_logging_dirs(self): # Create directories for data and results log_dir = os.path.join(run_dir, self.label) - self._function_data_dir = os.path.join(f"/tmp/function_data/", self.label, run_idx) + self._function_data_dir = os.path.join("/tmp/function_data/", self.label, run_idx) os.makedirs(log_dir) os.makedirs(self._function_data_dir) diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index b118c20766..cd12d47f5b 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -127,11 +127,12 @@ def _prepare_environment_regular(m, manager_config, t, task, poncho_env_to_file, if poncho_env_file is not None: t.add_environment(poncho_env_file) + def _handle_file_declaration_protocol(m, filename, cache): if "taskvinetemp://" in filename: return m.declare_temp() elif "https://" in filename or "http://" in filename: - return m.declare_url(filename, cache=cache, peer_transfer=True) + return m.declare_url(filename, cache=cache, peer_transfer=True) else: return m.declare_file(filename, cache=cache, peer_transfer=True) @@ -186,8 +187,6 @@ def _taskvine_submit_wait(ready_task_queue=None, # dict[str] -> vine File object parsl_file_name_to_vine_file = {} - parsl_file_possible_temp = {} - # Mapping of tasks from vine id to parsl id # Dict[str] -> str vine_id_to_executor_task_id = {} @@ -368,7 +367,6 @@ def _taskvine_submit_wait(ready_task_queue=None, # not staged by taskvine. # Files that share the same local path are assumed to be the same # and thus use the same Vine File object if detected. - if not manager_config.shared_fs: for spec in task.input_files: if spec.stage: @@ -378,7 +376,6 @@ def _taskvine_submit_wait(ready_task_queue=None, task_in_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache) parsl_file_name_to_vine_file[spec.parsl_name] = task_in_file logger.debug("Adding input file {}, {} to TaskVine".format(task_in_file, task.executor_id)) -# split_name = spec.parsl_name.split("/")[-1] t.add_input(task_in_file, spec.netloc) for spec in task.output_files: @@ -389,7 +386,6 @@ def _taskvine_submit_wait(ready_task_queue=None, task_out_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache) parsl_file_name_to_vine_file[spec.parsl_name] = task_out_file logger.debug("Adding output file {}, {} to TaskVine".format(task_out_file, task.executor_id)) - split_name = spec.parsl_name.split("/")[-1] t.add_output(task_out_file, spec.netloc) # Submit the task to the TaskVine object diff --git a/parsl/executors/taskvine/stub_staging_provider.py b/parsl/executors/taskvine/stub_staging_provider.py index 9e16bc8e8b..8eee8b8c85 100644 --- a/parsl/executors/taskvine/stub_staging_provider.py +++ b/parsl/executors/taskvine/stub_staging_provider.py @@ -5,15 +5,15 @@ from parsl.data_provider.files import File from concurrent.futures import Future from parsl.app.futures import DataFuture -from typing import Optional, Callable -import os +from typing import Optional logger = logging.getLogger(__name__) known_url_schemes = ["file", "http", "https", "taskvinetemp"] + class StubStaging(Staging, RepresentationMixin): - + def can_stage_in(self, file): logger.debug("Task vine staging provider checking passthrough for {}".format(repr(file))) return file.scheme in known_url_schemes @@ -22,13 +22,13 @@ def can_stage_out(self, file): logger.debug("Task vine staging provider checking passthrough for {}".format(repr(file))) return file.scheme in known_url_schemes - def stage_in(self, dm: "DataManager", executor: str, file: File, parent_fut: Optional[Future]) -> Optional[DataFuture]: + def stage_in(self, dm, executor: str, file: File, parent_fut: Optional[Future]) -> Optional[DataFuture]: if file.netloc == '': file.netloc = file.filename file.local_path = file.netloc return None - def stage_out(self, dm: "DataManager", executor: str, file: File, app_fu: Future) -> Optional[Future]: + def stage_out(self, dm, executor: str, file: File, app_fu: Future) -> Optional[Future]: if file.netloc == '': file.netloc = file.filename file.local_path = file.netloc diff --git a/parsl/executors/taskvine/utils.py b/parsl/executors/taskvine/utils.py index af79ebe1e6..839c6e7173 100644 --- a/parsl/executors/taskvine/utils.py +++ b/parsl/executors/taskvine/utils.py @@ -1,5 +1,4 @@ from typing import Optional -import uuid class ParslTaskToVine: From d192344518b98816bb29cab9eb2214ceaf30688b Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Fri, 19 Apr 2024 14:13:33 -0400 Subject: [PATCH 06/14] test config --- parsl/tests/configs/taskvine_ex.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/parsl/tests/configs/taskvine_ex.py b/parsl/tests/configs/taskvine_ex.py index 276e37cf4e..4aac29dfa1 100644 --- a/parsl/tests/configs/taskvine_ex.py +++ b/parsl/tests/configs/taskvine_ex.py @@ -1,10 +1,12 @@ from parsl.config import Config -from parsl.data_provider.file_noop import NoOpFileStaging -from parsl.data_provider.ftp import FTPInTaskStaging -from parsl.data_provider.http import HTTPInTaskStaging -from parsl.executors.taskvine import TaskVineExecutor, TaskVineManagerConfig +from parsl.executors.taskvine import TaskVineExecutor +from parsl.executors.taskvine import TaskVineManagerConfig +from parsl.data_provider.http import HTTPInTaskStaging +from parsl.data_provider.ftp import FTPInTaskStaging +from parsl.executors.taskvine.stub_staging_provider import StubStaging def fresh_config(): return Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000), - worker_launch_method='factory')]) + worker_launch_method='factory', + storage_access=[FTPInTaskStaging(), StubStaging()])]) From 552c4467f8f620dd34b60966909903b918369eff Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Mon, 22 Apr 2024 10:45:22 -0400 Subject: [PATCH 07/14] format --- parsl/tests/configs/taskvine_ex.py | 1 + 1 file changed, 1 insertion(+) diff --git a/parsl/tests/configs/taskvine_ex.py b/parsl/tests/configs/taskvine_ex.py index 4aac29dfa1..8daadfdac1 100644 --- a/parsl/tests/configs/taskvine_ex.py +++ b/parsl/tests/configs/taskvine_ex.py @@ -6,6 +6,7 @@ from parsl.data_provider.ftp import FTPInTaskStaging from parsl.executors.taskvine.stub_staging_provider import StubStaging + def fresh_config(): return Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000), worker_launch_method='factory', From e27ae0260496c8bc384a5579cfcb598990ca2a93 Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Thu, 23 May 2024 15:01:03 -0400 Subject: [PATCH 08/14] logging dir, and local file path fix --- parsl/executors/taskvine/executor.py | 4 ++-- parsl/executors/taskvine/stub_staging_provider.py | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 6f173f3e9b..c3172749c2 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -220,9 +220,9 @@ def __create_data_and_logging_dirs(self): # Create directories for data and results log_dir = os.path.join(run_dir, self.label) - self._function_data_dir = os.path.join("/tmp/function_data/", self.label, run_idx) + self._function_data_dir = os.path.join("/tmp/function_data/", self.label, str(uuid.uuid4())) os.makedirs(log_dir) - os.makedirs(self._function_data_dir) + os.makedirs(self._function_data_dir, exist_ok=True) # put TaskVine logs outside of a Parsl run as TaskVine caches between runs while # Parsl does not. diff --git a/parsl/executors/taskvine/stub_staging_provider.py b/parsl/executors/taskvine/stub_staging_provider.py index 8eee8b8c85..b152a57794 100644 --- a/parsl/executors/taskvine/stub_staging_provider.py +++ b/parsl/executors/taskvine/stub_staging_provider.py @@ -25,11 +25,13 @@ def can_stage_out(self, file): def stage_in(self, dm, executor: str, file: File, parent_fut: Optional[Future]) -> Optional[DataFuture]: if file.netloc == '': file.netloc = file.filename - file.local_path = file.netloc + if file.scheme in ["taskvinetemp", "https", "http"]: + file.local_path = file.netloc return None def stage_out(self, dm, executor: str, file: File, app_fu: Future) -> Optional[Future]: if file.netloc == '': file.netloc = file.filename - file.local_path = file.netloc + if file.scheme in ["taskvinetemp", "https", "http"]: + file.local_path = file.netloc return None From e09aacd76bd44e062d112905ac08871a8d6370ce Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Thu, 23 May 2024 15:03:53 -0400 Subject: [PATCH 09/14] flake8 --- parsl/executors/taskvine/executor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index c3172749c2..6e6de95356 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -216,8 +216,6 @@ def __create_data_and_logging_dirs(self): # Use the current run directory from Parsl run_dir = self.run_dir - run_idx = run_dir.split("/")[-1] - # Create directories for data and results log_dir = os.path.join(run_dir, self.label) self._function_data_dir = os.path.join("/tmp/function_data/", self.label, str(uuid.uuid4())) From ce8c1f7888f6b791c3992f05d258bf9ef8698961 Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Tue, 28 May 2024 13:06:22 -0400 Subject: [PATCH 10/14] fix --- parsl/executors/taskvine/manager.py | 14 ++++++++++---- parsl/executors/taskvine/stub_staging_provider.py | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index cd12d47f5b..b5bece9d9a 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -375,8 +375,11 @@ def _taskvine_submit_wait(ready_task_queue=None, else: task_in_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache) parsl_file_name_to_vine_file[spec.parsl_name] = task_in_file - logger.debug("Adding input file {}, {} to TaskVine".format(task_in_file, task.executor_id)) - t.add_input(task_in_file, spec.netloc) + logger.debug("Adding input file {}, {} to TaskVine".format(spec.parsl_name, task.executor_id)) + if spec.netloc == '': + t.add_input(task_in_file, spec.parsl_name) + else: + t.add_input(task_in_file, spec.netloc) for spec in task.output_files: if spec.stage: @@ -385,8 +388,11 @@ def _taskvine_submit_wait(ready_task_queue=None, else: task_out_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache) parsl_file_name_to_vine_file[spec.parsl_name] = task_out_file - logger.debug("Adding output file {}, {} to TaskVine".format(task_out_file, task.executor_id)) - t.add_output(task_out_file, spec.netloc) + logger.debug("Adding output file {}, {} to TaskVine".format(spec.parsl_name, task.executor_id)) + if spec.netloc == '': + t.add_output(task_out_file, spec.parsl_name) + else: + t.add_output(task_out_file, spec.netloc) # Submit the task to the TaskVine object logger.debug("Submitting executor task {}, {} to TaskVine".format(task.executor_id, t)) diff --git a/parsl/executors/taskvine/stub_staging_provider.py b/parsl/executors/taskvine/stub_staging_provider.py index b152a57794..f977af9d23 100644 --- a/parsl/executors/taskvine/stub_staging_provider.py +++ b/parsl/executors/taskvine/stub_staging_provider.py @@ -9,7 +9,7 @@ logger = logging.getLogger(__name__) -known_url_schemes = ["file", "http", "https", "taskvinetemp"] +known_url_schemes = ["http", "https", "taskvinetemp"] class StubStaging(Staging, RepresentationMixin): From 397fd12f6ae04e6ba77cefaea8ef1fce0591182f Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Tue, 28 May 2024 13:15:38 -0400 Subject: [PATCH 11/14] explicit data providers --- parsl/tests/configs/taskvine_ex.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/parsl/tests/configs/taskvine_ex.py b/parsl/tests/configs/taskvine_ex.py index 8daadfdac1..eb9593f24f 100644 --- a/parsl/tests/configs/taskvine_ex.py +++ b/parsl/tests/configs/taskvine_ex.py @@ -4,10 +4,12 @@ from parsl.data_provider.http import HTTPInTaskStaging from parsl.data_provider.ftp import FTPInTaskStaging +from parsl.data_provider.file_noop import NoOpFileStaging +from parsl.data_provider.zip import ZipFileStaging from parsl.executors.taskvine.stub_staging_provider import StubStaging def fresh_config(): return Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000), worker_launch_method='factory', - storage_access=[FTPInTaskStaging(), StubStaging()])]) + storage_access=[FTPInTaskStaging(), StubStaging(), NoOpFileStaging(), ZipFileStaging()])]) From 3f3b3ab84658087ceed160d3931af8b72d41a534 Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Tue, 16 Jul 2024 14:58:55 -0400 Subject: [PATCH 12/14] sort imports --- parsl/executors/taskvine/stub_staging_provider.py | 10 +++++----- parsl/tests/configs/taskvine_ex.py | 8 +++----- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/parsl/executors/taskvine/stub_staging_provider.py b/parsl/executors/taskvine/stub_staging_provider.py index f977af9d23..e3212903a3 100644 --- a/parsl/executors/taskvine/stub_staging_provider.py +++ b/parsl/executors/taskvine/stub_staging_provider.py @@ -1,12 +1,12 @@ import logging - -from parsl.utils import RepresentationMixin -from parsl.data_provider.staging import Staging -from parsl.data_provider.files import File from concurrent.futures import Future -from parsl.app.futures import DataFuture from typing import Optional +from parsl.app.futures import DataFuture +from parsl.data_provider.files import File +from parsl.data_provider.staging import Staging +from parsl.utils import RepresentationMixin + logger = logging.getLogger(__name__) known_url_schemes = ["http", "https", "taskvinetemp"] diff --git a/parsl/tests/configs/taskvine_ex.py b/parsl/tests/configs/taskvine_ex.py index eb9593f24f..18937a152d 100644 --- a/parsl/tests/configs/taskvine_ex.py +++ b/parsl/tests/configs/taskvine_ex.py @@ -1,11 +1,9 @@ from parsl.config import Config -from parsl.executors.taskvine import TaskVineExecutor -from parsl.executors.taskvine import TaskVineManagerConfig - -from parsl.data_provider.http import HTTPInTaskStaging -from parsl.data_provider.ftp import FTPInTaskStaging from parsl.data_provider.file_noop import NoOpFileStaging +from parsl.data_provider.ftp import FTPInTaskStaging +from parsl.data_provider.http import HTTPInTaskStaging from parsl.data_provider.zip import ZipFileStaging +from parsl.executors.taskvine import TaskVineExecutor, TaskVineManagerConfig from parsl.executors.taskvine.stub_staging_provider import StubStaging From ac34607db7d2dfb147bae88708e64bb4778e7dc3 Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Mon, 19 Aug 2024 13:18:50 -0400 Subject: [PATCH 13/14] clean up naming strategy --- parsl/executors/taskvine/executor.py | 4 ++-- parsl/executors/taskvine/manager.py | 8 ++++---- parsl/executors/taskvine/stub_staging_provider.py | 10 ++++------ parsl/executors/taskvine/utils.py | 4 ++-- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 21cdfc71da..87388395c6 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -508,10 +508,10 @@ def _register_file(self, parsl_file): if parsl_file.scheme == 'file' or \ (parsl_file.local_path and os.path.exists(parsl_file.local_path)): to_stage = not os.path.isabs(parsl_file.filepath) - return ParslFileToVine(parsl_file.filepath, parsl_file.netloc, to_stage, to_cache) + return ParslFileToVine(parsl_file.filepath, parsl_file.filepath, to_stage, to_cache) else: # we must stage url and temp files - ptv = ParslFileToVine(parsl_file.url, parsl_file.netloc, True, to_cache) + ptv = ParslFileToVine(parsl_file.url, parsl_file.local_path, True, to_cache) return ptv def _std_output_to_vine(self, fdname, stdfspec): diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index b5bece9d9a..b7c6a1fd8a 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -376,10 +376,10 @@ def _taskvine_submit_wait(ready_task_queue=None, task_in_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache) parsl_file_name_to_vine_file[spec.parsl_name] = task_in_file logger.debug("Adding input file {}, {} to TaskVine".format(spec.parsl_name, task.executor_id)) - if spec.netloc == '': + if spec.remote_name == '': t.add_input(task_in_file, spec.parsl_name) else: - t.add_input(task_in_file, spec.netloc) + t.add_input(task_in_file, spec.remote_name) for spec in task.output_files: if spec.stage: @@ -389,10 +389,10 @@ def _taskvine_submit_wait(ready_task_queue=None, task_out_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache) parsl_file_name_to_vine_file[spec.parsl_name] = task_out_file logger.debug("Adding output file {}, {} to TaskVine".format(spec.parsl_name, task.executor_id)) - if spec.netloc == '': + if spec.remote_name == '': t.add_output(task_out_file, spec.parsl_name) else: - t.add_output(task_out_file, spec.netloc) + t.add_output(task_out_file, spec.remote_name) # Submit the task to the TaskVine object logger.debug("Submitting executor task {}, {} to TaskVine".format(task.executor_id, t)) diff --git a/parsl/executors/taskvine/stub_staging_provider.py b/parsl/executors/taskvine/stub_staging_provider.py index e3212903a3..818d764295 100644 --- a/parsl/executors/taskvine/stub_staging_provider.py +++ b/parsl/executors/taskvine/stub_staging_provider.py @@ -23,15 +23,13 @@ def can_stage_out(self, file): return file.scheme in known_url_schemes def stage_in(self, dm, executor: str, file: File, parent_fut: Optional[Future]) -> Optional[DataFuture]: - if file.netloc == '': - file.netloc = file.filename if file.scheme in ["taskvinetemp", "https", "http"]: - file.local_path = file.netloc + file.local_path = file.url.split('/')[-1] + logger.debug("Task vine staging provider stage in for {}".format(repr(file))) return None def stage_out(self, dm, executor: str, file: File, app_fu: Future) -> Optional[Future]: - if file.netloc == '': - file.netloc = file.filename if file.scheme in ["taskvinetemp", "https", "http"]: - file.local_path = file.netloc + file.local_path = file.url.split('/')[-1] + logger.debug("Task vine staging provider stage out for {}".format(repr(file))) return None diff --git a/parsl/executors/taskvine/utils.py b/parsl/executors/taskvine/utils.py index 839c6e7173..1a6e5643a1 100644 --- a/parsl/executors/taskvine/utils.py +++ b/parsl/executors/taskvine/utils.py @@ -69,14 +69,14 @@ class ParslFileToVine: """ def __init__(self, parsl_name: str, # name of file - netloc: str, # name of file if url + remote_name: str, # name of file if url stage: bool, # whether TaskVine should know about this file cache: bool # whether TaskVine should cache this file ): self.parsl_name = parsl_name self.stage = stage self.cache = cache - self.netloc = netloc + self.remote_name = remote_name def run_parsl_function(map_file, function_file, argument_file, result_file): From 999ed6183ea35315e9625f865d3368e65077d173 Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Wed, 21 Aug 2024 14:34:23 -0400 Subject: [PATCH 14/14] change provider name and protocol handler --- parsl/executors/taskvine/executor.py | 6 +++--- parsl/executors/taskvine/manager.py | 14 +++++++------- ...ng_provider.py => taskvine_staging_provider.py} | 2 +- parsl/executors/taskvine/utils.py | 4 +++- parsl/tests/configs/taskvine_ex.py | 4 ++-- 5 files changed, 16 insertions(+), 14 deletions(-) rename parsl/executors/taskvine/{stub_staging_provider.py => taskvine_staging_provider.py} (96%) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 87388395c6..ca2c9b2f5f 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -508,10 +508,10 @@ def _register_file(self, parsl_file): if parsl_file.scheme == 'file' or \ (parsl_file.local_path and os.path.exists(parsl_file.local_path)): to_stage = not os.path.isabs(parsl_file.filepath) - return ParslFileToVine(parsl_file.filepath, parsl_file.filepath, to_stage, to_cache) + return ParslFileToVine(parsl_file.filepath, parsl_file.filepath, stage=to_stage, cache=to_cache, protocol=parsl_file.scheme) else: # we must stage url and temp files - ptv = ParslFileToVine(parsl_file.url, parsl_file.local_path, True, to_cache) + ptv = ParslFileToVine(parsl_file.url, parsl_file.local_path, stage=True, cache=to_cache, protocol=parsl_file.scheme) return ptv def _std_output_to_vine(self, fdname, stdfspec): @@ -519,7 +519,7 @@ def _std_output_to_vine(self, fdname, stdfspec): return a ParslFileToVine with it. These files are never cached""" fname, mode = putils.get_std_fname_mode(fdname, stdfspec) to_stage = not os.path.isabs(fname) - return ParslFileToVine(fname, fname, stage=to_stage, cache=False) + return ParslFileToVine(fname, fname, stage=to_stage, cache=False, protocol="file") def _prepare_package(self, fn, extra_pkgs): """ Look at source code of apps to figure out their package depedencies diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index b7c6a1fd8a..5515eb3d2e 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -128,13 +128,13 @@ def _prepare_environment_regular(m, manager_config, t, task, poncho_env_to_file, t.add_environment(poncho_env_file) -def _handle_file_declaration_protocol(m, filename, cache): - if "taskvinetemp://" in filename: +def _handle_file_declaration_protocol(m, spec): + if "http" in spec.protocol: + return m.declare_url(spec.parsl_name, cache=spec.cache, peer_transfer=True) + elif spec.protocol == "taskvinetemp": return m.declare_temp() - elif "https://" in filename or "http://" in filename: - return m.declare_url(filename, cache=cache, peer_transfer=True) else: - return m.declare_file(filename, cache=cache, peer_transfer=True) + return m.declare_file(spec.parsl_name, cache=spec.cache, peer_transfer=True) @wrap_with_logs @@ -373,7 +373,7 @@ def _taskvine_submit_wait(ready_task_queue=None, if spec.parsl_name in parsl_file_name_to_vine_file: task_in_file = parsl_file_name_to_vine_file[spec.parsl_name] else: - task_in_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache) + task_in_file = _handle_file_declaration_protocol(m, spec) parsl_file_name_to_vine_file[spec.parsl_name] = task_in_file logger.debug("Adding input file {}, {} to TaskVine".format(spec.parsl_name, task.executor_id)) if spec.remote_name == '': @@ -386,7 +386,7 @@ def _taskvine_submit_wait(ready_task_queue=None, if spec.parsl_name in parsl_file_name_to_vine_file: task_out_file = parsl_file_name_to_vine_file[spec.parsl_name] else: - task_out_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache) + task_out_file = _handle_file_declaration_protocol(m, spec) parsl_file_name_to_vine_file[spec.parsl_name] = task_out_file logger.debug("Adding output file {}, {} to TaskVine".format(spec.parsl_name, task.executor_id)) if spec.remote_name == '': diff --git a/parsl/executors/taskvine/stub_staging_provider.py b/parsl/executors/taskvine/taskvine_staging_provider.py similarity index 96% rename from parsl/executors/taskvine/stub_staging_provider.py rename to parsl/executors/taskvine/taskvine_staging_provider.py index 818d764295..68aef0049f 100644 --- a/parsl/executors/taskvine/stub_staging_provider.py +++ b/parsl/executors/taskvine/taskvine_staging_provider.py @@ -12,7 +12,7 @@ known_url_schemes = ["http", "https", "taskvinetemp"] -class StubStaging(Staging, RepresentationMixin): +class TaskVineStaging(Staging, RepresentationMixin): def can_stage_in(self, file): logger.debug("Task vine staging provider checking passthrough for {}".format(repr(file))) diff --git a/parsl/executors/taskvine/utils.py b/parsl/executors/taskvine/utils.py index 1a6e5643a1..e3f1d31782 100644 --- a/parsl/executors/taskvine/utils.py +++ b/parsl/executors/taskvine/utils.py @@ -71,12 +71,14 @@ def __init__(self, parsl_name: str, # name of file remote_name: str, # name of file if url stage: bool, # whether TaskVine should know about this file - cache: bool # whether TaskVine should cache this file + cache: bool, # whether TaskVine should cache this file + protocol: str, # protocol if url ): self.parsl_name = parsl_name self.stage = stage self.cache = cache self.remote_name = remote_name + self.protocol = protocol def run_parsl_function(map_file, function_file, argument_file, result_file): diff --git a/parsl/tests/configs/taskvine_ex.py b/parsl/tests/configs/taskvine_ex.py index 18937a152d..f42ed9ccbb 100644 --- a/parsl/tests/configs/taskvine_ex.py +++ b/parsl/tests/configs/taskvine_ex.py @@ -4,10 +4,10 @@ from parsl.data_provider.http import HTTPInTaskStaging from parsl.data_provider.zip import ZipFileStaging from parsl.executors.taskvine import TaskVineExecutor, TaskVineManagerConfig -from parsl.executors.taskvine.stub_staging_provider import StubStaging +from parsl.executors.taskvine.taskvine_staging_provider import TaskVineStaging def fresh_config(): return Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000), worker_launch_method='factory', - storage_access=[FTPInTaskStaging(), StubStaging(), NoOpFileStaging(), ZipFileStaging()])]) + storage_access=[FTPInTaskStaging(), TaskVineStaging(), NoOpFileStaging(), ZipFileStaging()])])