Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][adag] Separate the outputs of execute and execute_async to multiple refs or futures to allow clients to retrieve them one at a time (#46908) #47305

Merged

Conversation

jeffreyjeffreywang
Copy link
Contributor

@jeffreyjeffreywang jeffreyjeffreywang commented Aug 23, 2024

Why are these changes needed?

Currently, if MultiOutputNode is used to wrap a DAG's output, you get back a single CompiledDAGRef or CompiledDAGFuture, depending on whether execute or execute_async is invoked, that points to a list of all of the outputs. To retrieve one of the outputs, you have to get and deserialize all of them at the same time.

This PR separates the output of execute and execute_async to a list of CompiledDAGRef or CompiledDAGFuture when the output is wrapped by MultiOutputNode. This is particularly useful for vLLM tensor parallelism. Since all shards return the same results, we only need to fetch result from one of the workers.

Related issue number

Resolves #46908

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

jeffreyjeffreywang added 7 commits August 19, 2024 00:20
Signed-off-by: jeffreyjeffreywang <[email protected]>
… CompiledDAGFuture

Signed-off-by: jeffreyjeffreywang <[email protected]>
Signed-off-by: jeffreyjeffreywang <[email protected]>
Signed-off-by: jeffreyjeffreywang <[email protected]>
Signed-off-by: jeffreyjeffreywang <[email protected]>
@jeffreyjeffreywang
Copy link
Contributor Author

Hi Ray developers, I would love to have some help validating test_execution_schedule_gpu.py tests as they require a GPU device which I don't have one.

@anyscalesam anyscalesam added accelerated-dag triage Needs triage (eg: priority, bug/not-bug, and owning component) core Issues that should be addressed in Ray Core labels Aug 26, 2024
Copy link
Contributor

@ruisearch42 ruisearch42 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fist pass. Looks pretty nice. Thanks for the contribution!

python/ray/experimental/compiled_dag_ref.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
@stephanie-wang stephanie-wang self-assigned this Aug 27, 2024
Signed-off-by: jeffreyjeffreywang <[email protected]>
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think generally the approach makes sense! But I am seeing 2 problems.

  1. It is backward incompatible changes and will break vllm. How complicated is it to support MultiOutputNode(_multiple_return_refs=True)? I am not suggesting to do it, but I'd like to measure the complexity
  2. I think although we don't call ray.get, ray.get is called and deserialization still happens when the dag ref is deallocated because of this code
    if not self._ray_get_called:
    . I think we should improve this part of code to not doing deserialization. I think this part is a little bit tricky to handle (so we can probably handle as a follow up)

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
@@ -1564,42 +1570,48 @@ def _execute_until(

TODO(rui): catch the case that user holds onto the CompiledDAGRefs
"""
from ray.dag import DAGContext
if self._max_execution_index < execution_index:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this if now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An ImportError will occur with the original code when destructing unused CompiledDAGRef (ones that we have not called get()). Here is a minimal repro for the original behavior:

foo = Foo.remote()
bar = Bar.remote()

with InputNode() as inp:
    dag = MultiOutputNode([foo.increment.bind(inp), bar.decrement.bind(inp)])

dag = dag.experimental_compile()

ref1 = dag.execute(1)
ref2 = dag.execute(1)

assert ref2.get() == [2, -2]

dag.teardown()
# When exiting the program, ref1.__del__ is invoked.
# Since it has not been called with get(), ref1.get() will
# be invoked subsequently. In _execute_until, even though
# the DAG won't be executed again, we still attempt to
# import a library, namely ray.dag.DAGContext. Since
# Python is shutting down, an ImportError occurs
# (ImportError: sys.meta_path is None, Python is likely
# shutting down).

The same problem persists with the current behavior if the if clause is not introduced. Another the benefit of introducing the if clause is to only import the library and calculate timeout when necessary. timeout is only used when self._max_execution_index < execution_index is True. We shouldn't import the library when self._max_execution_index == execution_index. I should keep the following while loop out of the if clause though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I speak, I found out that if the last CompiledDAGRef is unused, the ImportError persists even when the if clause is introduced. Is there a guidance for Ray users to always invoke get() on all CompiledDAGRef? If not, this may be a bug that needs to be addressed in a separate PR. WDYT?

Here is a minimal repro:

foo = Foo.remote()
bar = Bar.remote()

with InputNode() as inp:
    dag = MultiOutputNode([foo.increment.bind(inp), bar.decrement.bind(inp)])

dag = dag.experimental_compile()

ref1 = dag.execute(1)
ref2 = dag.execute(1)

assert ref1.get() == [1, -1]

dag.teardown()
# Upon destruction, the DAG will be executed until the latest index. Again,
# we attempt to import DAGContext during program exit. 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, I think it's worth keeping the if clause and have another bug tracking the dangling CompiledDAGRef issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this issue I mentioned is correlated with your second point. Is there any reason why we want to avoid execution result leak (which is why we currently execute the DAG until the latest index and get() all results)?

if self._max_execution_index + 1 == execution_index:
# Directly fetch and return without buffering
while self._max_execution_index < execution_index:
if len(self._result_buffer) >= self._max_buffered_results:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is wrong now. We should do len(self._result_buffer) * num_output_channels

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also add a unit test? (or modify existing test)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is okay, it matches the previous semantics. Maybe the naming is not good - it should be the number of DAG executions buffered, not the number of individual results buffered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. len(self._result_buffer) indicates the number of DAG executions while len(self._result_buffer) * num_output_channels represents the total number of outputs. I will adjust the naming of _max_buffered_results to _max_buffered_executions (any other suggestions?) and add a unit test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think we need to change max_buffered_results to max_buffered_executions if we were to change _max_buffered_results to _max_buffered_executions. Since there are quite a few references to max_buffered_results in the repo (including developer APIs and python/ray/dag/context.py which doesn't have much relevance to the changes in this PR), I'd suggest to not modify the naming now. We can track it as a bug and address it in a follow-up.

As for the unit test, I'll add one tomorrow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done adding a unit test for cases when there are multiple outputs. Now, if not all results from an execution index are fetched with get(), that execution index will still count towards the number of buffered results. Let me know if this sounds okay to you!

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
@jeffreyjeffreywang
Copy link
Contributor Author

Hey @rkooo567, thank you so much for the review. Let me experiment a few things you suggested and get back to you tomorrow.

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this contribution!

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
if self._max_execution_index + 1 == execution_index:
# Directly fetch and return without buffering
while self._max_execution_index < execution_index:
if len(self._result_buffer) >= self._max_buffered_results:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is okay, it matches the previous semantics. Maybe the naming is not good - it should be the number of DAG executions buffered, not the number of individual results buffered.

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/tests/experimental/test_accelerated_dag.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Aug 29, 2024
Signed-off-by: jeffreyjeffreywang <[email protected]>
@jeffreyjeffreywang
Copy link
Contributor Author

Thanks for all of the feedback. I'll keep addressing comments tomorrow and over the weekend. Will ping you as soon as the revision is ready for review.

@jeffreyjeffreywang
Copy link
Contributor Author

jeffreyjeffreywang commented Aug 30, 2024

It is backward incompatible changes and will break vllm. How complicated is it to support MultiOutputNode(_multiple_return_refs=True)? I am not suggesting to do it, but I'd like to measure the complexity.

@rkooo567, I think this should be quite straightforward to support. Essentially, we can ask clients to specify multiple_return_refs=True when they want a list of refs rather than a single ref wrapping all outputs. execute and execute_async can check whether MultiOutputNode is used to wrap the output and whether multiple_return_refs is set to True. If both conditions are satisfied, we return a list of refs. Otherwise, fall back to original behavior. Could you please help me understand VLLM's use case for me to examine whether this workaround is sufficient? Also, please let me know if there's any corner case I'm missing.

jeffreyjeffreywang added 3 commits September 2, 2024 01:07
…etchers for synchronous case

Signed-off-by: jeffreyjeffreywang <[email protected]>
Signed-off-by: jeffreyjeffreywang <[email protected]>
… is only a single output channel

Signed-off-by: jeffreyjeffreywang <[email protected]>
@jeffreyjeffreywang
Copy link
Contributor Author

Please hold off reviewing as I'm now working on making it backward compatible. Thanks!

jeffreyjeffreywang added 3 commits September 2, 2024 22:08
@jeffreyjeffreywang
Copy link
Contributor Author

The change is now backward compatible. Working on unit tests and addressing the remaining comments.

Signed-off-by: jeffreyjeffreywang <[email protected]>
@jeffreyjeffreywang
Copy link
Contributor Author

jeffreyjeffreywang commented Sep 6, 2024

Overall structure looks good! Just some comments to unify the sync/async codepaths more, and ideally can we remove self._multiple_return_refs?

Thanks for the feedback, Stephanie! I refactored the code, unified sync/async codepaths, and added one more test case. I think we still need multiple_return_refs though (see my comments in the relevant thread).

Here are some pending issues that we might want to address as follow-ups according to our discussion above:

  • Deserialization still happens in the destructor of CompiledDAGRef/CompiledDAGFuture
  • ray.get doesn't support fetching results from a list of CompiledDAGRef
  • There isn't sufficient ADAG test coverage for async cases (quite comprehensive for sync cases though)

@stephanie-wang
Copy link
Contributor

This looks great!

Two last things before we merge:

  • could you answer my question in the other thread about why we need returns_multiple_refs?
  • "ray.get doesn't support fetching results from a list of CompiledDAGRef" -> can we support this? Otherwise we will have a breaking API change. A naive solution that just gets the refs in order is good for now.

@jeffreyjeffreywang
Copy link
Contributor Author

jeffreyjeffreywang commented Sep 7, 2024

"ray.get doesn't support fetching results from a list of CompiledDAGRef" -> can we support this? Otherwise we will have a breaking API change. A naive solution that just gets the refs in order is good for now.

Definitely, will do 😃

@jeffreyjeffreywang
Copy link
Contributor Author

Done supporting fetching a list of CompiledDAGRef with ray.get(). @stephanie-wang, let me know if you have further concerns. Thank you again for all of your feedback!

python/ray/_private/worker.py Outdated Show resolved Hide resolved
python/ray/_private/worker.py Outdated Show resolved Hide resolved
jeffreyjeffreywang added 4 commits September 10, 2024 05:27
Signed-off-by: jeffreyjeffreywang <[email protected]>
…eturn a list of refs

Signed-off-by: jeffreyjeffreywang <[email protected]>
Signed-off-by: jeffreyjeffreywang <[email protected]>
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thank you for this great contribution!

By the way, if you are not already on it, please feel free to join #ray-accelerated-dags at ray.slack.com. I'm sure there are some interested users there :)

jeffreyjeffreywang and others added 4 commits September 11, 2024 04:55
@jeffreyjeffreywang
Copy link
Contributor Author

jeffreyjeffreywang commented Sep 11, 2024

Thank you so much @stephanie-wang for all of the reviews. I'm looking forward to contributing more. 😄

Would you mind taking a final look at my latest update? I resolved some merge conflicts with SangBin's recent changes.

@jeffreyjeffreywang
Copy link
Contributor Author

I think although we don't call ray.get, ray.get is called and deserialization still happens when the dag ref is deallocated because of this code.

if not self._ray_get_called:

I think we should improve this part of code to not doing deserialization. I think this part is a little bit tricky to handle (so we can probably handle as a follow up)

Tracking this issue with #47614.

@stephanie-wang
Copy link
Contributor

Thank you so much @stephanie-wang for all of the reviews. I'm looking forward to contributing more. 😄

Would you mind taking a final look at my latest update? I resolved some merge conflicts with SangBin's recent changes.

It looks good to me! Will merge once tests pass.

@stephanie-wang stephanie-wang enabled auto-merge (squash) September 11, 2024 21:25
@github-actions github-actions bot added the go add ONLY when ready to merge, run all tests label Sep 11, 2024
@stephanie-wang stephanie-wang merged commit a6f923b into ray-project:master Sep 11, 2024
7 checks passed
@rkooo567
Copy link
Contributor

rkooo567 commented Sep 11, 2024

also to be clear @stephanie-wang. @jeffreyjeffreywang it will be a breaking change right?

@jeffreyjeffreywang
Copy link
Contributor Author

also to be clear @stephanie-wang. @jeffreyjeffreywang it will be a breaking change right?

Correct, this will be breaking. execute and execute_async on adag with MultiOutputNode will always return a list of refs/futures.

@rkooo567
Copy link
Contributor

makes sense! Also @jeffreyjeffreywang do we have issues for all follow up problems?

- Deserialization still happens in the destructor of CompiledDAGRef/CompiledDAGFuture
-  ray.get doesn't support fetching results from a list of CompiledDAGRef
-  There isn't sufficient ADAG test coverage for async cases (quite comprehensive for sync cases though)

@jeffreyjeffreywang
Copy link
Contributor Author

makes sense! Also @jeffreyjeffreywang do we have issues for all follow up problems?

Yup, the second one is addressed in this PR. #47662 and #47614 track the remaining follow-up problems.

@rkooo567
Copy link
Contributor

@jeffreyjeffreywang #47684

actuallay is this a known issue? can you take a look?

@jeffreyjeffreywang
Copy link
Contributor Author

@rkooo567 Yeah, I also realized this may be a problem over the weekend. I think we need to support calling ray.wait on CompiledDAGFuture and a list of CompiledDAGFuture as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accelerated-dag @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests triage Needs triage (eg: priority, bug/not-bug, and owning component)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[core][adag] aDAGs with multiple outputs should allow getting them one at a time
5 participants