Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][aDAG] Clean up shutdown path #47702

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

rkooo567
Copy link
Contributor

@rkooo567 rkooo567 commented Sep 17, 2024

Why are these changes needed?

Vairous fixes including #47685 to clean up shutdown path.

  • Make teardown idemoptent
  • Remove unblocking teardown(wait=False). It is prone to all weird errors because we don't do synchronization properly before shutdown.
  • Previously, we teardown on __del__. It works well at runtime but not at shutdown time because desetruction order is not guaranteed at shutdown. We should use atexit handler instead. To fix this issue, I keep tracking of all compiled dags created (via weakref) and do teardown inside shutdown API which is called when the interpreter shutdown
  • Fix asyncio read/write being blocked and joined forever issue. We check read/write every 1 second and check sys.is_finalizing() which sets to True upon interpreting exiting time. We can't rely on atexit handler.teardown because asyncio read/write runs in thread pool, and thread pool is joined "before at exit handler is executed". See https://github.com/python/cpython/blob/8f82d9aa2191db7826bb7a453fe06ce65f966cf8/Lib/concurrent/futures/thread.py#L37 (this atexit handler always is called before python's regular atexit handler).
  • Change teardown logs to debug so that it won't be printed unless necessary.

Related issue number

Closes #47685 (comment)

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@rkooo567 rkooo567 added the go add ONLY when ready to merge, run all tests label Sep 17, 2024
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Can you add a description of the changes to the PR text?

python/ray/dag/compiled_dag_node.py Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
return [c.read() for c in self._input_channels]
results = []
for c in self._input_channels:
try:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add while loop

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
@rkooo567 rkooo567 changed the title [WIP][core][aDAG] Clean up shutdown path [core][aDAG] Clean up shutdown path Sep 19, 2024
Copy link
Contributor

@ruisearch42 ruisearch42 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise LGTM

# this process. It tracks them as weakref meaning when the compiled dag
# is GC'ed, it is automatically removed from here. It is used to teardown
# compiled dags at interpret shutdown time.
_compiled_dag_queue = weakref.WeakValueDictionary()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is a dict, not a queue, right?

@@ -1703,6 +1722,8 @@ def __init__(self):
# Lock to make sure that we only perform teardown for this DAG
# once.
self.in_teardown_lock = threading.Lock()
self.name = "MonitorThread"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AdagMonitorThread?

# has already been destructed, so it is not safe to block in
# ray.get.
monitor.teardown(wait=False)
monitor.teardown(wait=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this wait indefinitely or delay process shutdown?

Also update the comment above

# the destructor order is not guaranteed. We call this function
# upon `ray.worker.shutdown` which is registered to atexit handler
# so that teardown is properly called before objects are destructed.
def shutdown_compiled_dag_node():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node is ambiguous here. Can you rename?

if sys.is_finalizing():
break
else:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there not an outer check that is similar to L373-374?

Comment on lines +561 to +562
else:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we unify with the same code pattern as in L361-L371? i.e., L561-L562 does the same thing as L365.

@@ -1694,7 +1711,9 @@ def _is_same_actor(idx1: int, idx2: int) -> bool:
return not topological_order_exists

def _monitor_failures(self):
outer = self
import weakref
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to import weakref here in case it has already been imported at the top level?

break
except ray.exceptions.RayChannelTimeoutError:
# Interpreter exits. We should stop reading
# so that the thread can join.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does "the thread" refer to?

# so that the thread can join.
if sys.is_finalizing():
break
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need pass here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[core][aDAG] asyncio run hangs upon shutdown
4 participants