Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition in AsyncResult.wait and Connection.serve #530

Open
notEvil opened this issue Mar 14, 2023 · 6 comments
Open

Race condition in AsyncResult.wait and Connection.serve #530

notEvil opened this issue Mar 14, 2023 · 6 comments

Comments

@notEvil
Copy link

notEvil commented Mar 14, 2023

This issue was previously discussed in #492 and recently brought up in #527.

Hi!

I believe there is a race condition which potentially creates a deadlock situation. Consider thread A and B, and the following sequence of operations (rpyc 5.3.1)

If there is no third party, B might wait indefinitely for something that A already received but didn't process in time (lost the race). I hope this is easy to follow and self-evident. Obviously, the probability hitting this should be low, but I did and was able to reproduce the issue reliably in the past.

I found a more concise solution:

diff --git a/rpyc/core/async_.py b/rpyc/core/async_.py
index 0af147d..cf6aef1 100644
--- a/rpyc/core/async_.py
+++ b/rpyc/core/async_.py
@@ -44,16 +44,19 @@ class AsyncResult(object):
         """Waits for the result to arrive. If the AsyncResult object has an
         expiry set, and the result did not arrive within that timeout,
         an :class:`AsyncResultTimeout` exception is raised"""
-        while not (self._is_ready or self.expired):
+        while self._waiting():
             # Serve the connection since we are not ready. Suppose
             # the reply for our seq is served. The callback is this class
             # so __call__ sets our obj and _is_ready to true.
-            self._conn.serve(self._ttl)
+            self._conn.serve(self._ttl, condition=self._waiting)
 
         # Check if we timed out before result was ready
         if not self._is_ready:
             raise AsyncResultTimeout("result expired")
 
+    def _waiting(self):
+        return not (self._is_ready or self.expired)
+
     def add_callback(self, func):
         """Adds a callback to be invoked when the result arrives. The callback
         function takes a single argument, which is the current AsyncResult
diff --git a/rpyc/core/protocol.py b/rpyc/core/protocol.py
index 69643c7..96e549a 100644
--- a/rpyc/core/protocol.py
+++ b/rpyc/core/protocol.py
@@ -260,7 +260,7 @@ class Connection(object):
         return next(self._seqcounter)
 
     def _send(self, msg, seq, args):  # IO
-        data = brine.dump((msg, seq, args))
+        data = brine.I1.pack(msg) + brine.dump((seq, args))  # see _dispatch
         if self._bind_threads:
             this_thread = self._get_thread()
             data = brine.I8I8.pack(this_thread.id, this_thread._remote_thread_id) + data
@@ -392,8 +392,10 @@ class Connection(object):
             self._config["logger"].debug(debug_msg.format(msg, seq))
 
     def _dispatch(self, data):  # serving---dispatch?
-        msg, seq, args = brine.load(data)
+        msg, = brine.I1.unpack(data[:1])  # unpack just msg to reduce time to release
         if msg == consts.MSG_REQUEST:
+            self._recvlock.release()
+            seq, args = brine.load(data[1:])
             if self._bind_threads:
                 self._get_thread()._occupation_count += 1
             self._dispatch_request(seq, args)
@@ -404,15 +406,19 @@ class Connection(object):
                 if this_thread._occupation_count == 0:
                     this_thread._remote_thread_id = UNBOUND_THREAD_ID
             if msg == consts.MSG_REPLY:
+                seq, args = brine.load(data[1:])
                 obj = self._unbox(args)
                 self._seq_request_callback(msg, seq, False, obj)
+                self._recvlock.release()  # late release
             elif msg == consts.MSG_EXCEPTION:
+                self._recvlock.release()
+                seq, args = brine.load(data[1:])
                 obj = self._unbox_exc(args)
                 self._seq_request_callback(msg, seq, True, obj)
             else:
                 raise ValueError(f"invalid message type: {msg!r}")
 
-    def serve(self, timeout=1, wait_for_lock=True):  # serving
+    def serve(self, timeout=1, wait_for_lock=True, condition=lambda: True):  # serving
         """Serves a single request or reply that arrives within the given
         time frame (default is 1 sec). Note that the dispatching of a request
         might trigger multiple (nested) requests, thus this function may be
@@ -427,10 +433,17 @@ class Connection(object):
             # Exit early if we cannot acquire the recvlock
             if not self._recvlock.acquire(False):
                 if wait_for_lock:
+                    if not condition():  # unlikely, but the result could've arrived and another thread could've won the race to acquire
+                        return False
                     # Wait condition for recvlock release; recvlock is not underlying lock for condition
                     return self._recv_event.wait(timeout.timeleft())
                 else:
                     return False
+        if not condition():  # the result arrived and we won the race to acquire, unlucky
+            self._recvlock.release()
+            with self._recv_event:
+                self._recv_event.notify_all()
+            return False
         # Assume the receive rlock is acquired and incremented
         # We must release once BEFORE dispatch, dispatch any data, and THEN notify all (see issue #527 and #449)
         try:
@@ -442,7 +455,6 @@ class Connection(object):
                 self.close()  # sends close async request
             raise
         else:
-            self._recvlock.release()
             if data:
                 self._dispatch(data)  # Dispatch will unbox, invoke callbacks, etc.
                 return True

A unit test may monkey patch brine.load to hold the thread between release and _seq_request_callback while sending a second thread to win the race.

@comrumino
Copy link
Collaborator

I found a way to reproduce some pesky issues (prevelant in 99c5abe as well).

[rpyc]$ RPYC_BIND_THREADS=true taskset -c 0 pyenv exec python -m unittest discover -v  -k test_race -k test_refcount  
True
test_asyncresult_race (tests.test_race.TestRace) ... ok
test_refcount (tests.test_refcount.TestRefcount) ... ok

----------------------------------------------------------------------
Ran 2 tests in 0.532s

OK
[rpyc]$ RPYC_BIND_THREADS=false taskset -c 0 pyenv exec python -m unittest discover -v  -k test_race -k test_refcount  
False
test_asyncresult_race (tests.test_race.TestRace) ... ok
test_refcount (tests.test_refcount.TestRefcount) ... ok

----------------------------------------------------------------------
Ran 2 tests in 0.479s

OK

I fixed the busy loop and corrected a logic error in caf8c1c. The idea would be, if a thread is able to acquire the lock for a short period of time, then we know at that point in time no other thread is receiving data. However, the boolean we used prior cannot provide such guarantees as easily---always we can get more reuse out of the lock compared to introducing another variable which at best mirrors the recvlock state.

Thanks again for the busy loop catch. Lmk if you notice any other improvements to be made.

@notEvil
Copy link
Author

notEvil commented Mar 19, 2023

Now I'm confused, what issues?

The idea would be, if a thread is able to acquire the lock for a short period of time, then we know at that point in time no other thread is receiving data. However, the boolean we used prior cannot provide such guarantees as easily---always we can get more reuse out of the lock compared to introducing another variable which at best mirrors the recvlock state.

Can you clarify? I don't see the connection between duration of lock ownership and another thread currently receiving. All access to the boolean is done while holding self._lock, so its state is consistent with everything else under the locks umbrella. Also, recvlock is not used at all when using thread binding. Instead self._lock is used for all purposes. Their purpose is very different, especially now that AsyncResult.wait contains logic previously found in Connection.serve.

@comrumino
Copy link
Collaborator

Anyway, I went down a rabbit whole w/ too many changes at once. So, I reverted back to where your code was merged and started resolving bugs that impacted my ability to refactor.

RPYC_BIND_THREADS=false taskset -c 0 pyenv exec python -m unittest discover -v  -k test_race -k test_refcount -k test_affinity -k test_deploy
RPYC_BIND_THREADS=true taskset -c 0 pyenv exec python -m unittest discover -v  -k test_race -k test_refcount -k test_affinity -k test_deploy

I'd still like to refactor bind threads a bit to make things easier to follow and less brittle.

@comrumino
Copy link
Collaborator

I added bind thread test to the CI/CD and your branch merge in is master + some fixes.

@comrumino
Copy link
Collaborator

comrumino commented Mar 20, 2023

I was rushed after pushing fixes yesterday, and I'll try to recap the chaos of my commits/learning @notEvil .

Refactor rabbit hole and breaking changes
Passing the waiting function into serve seemed to introduce some accidental complexity that results in checking the state of AsyncResult more than once. This got me started down the refactoring rabbit hole. When testing my refactor, I set BIND_THREADS instead of RPYC_BIND_THREADS, meaning the tests never ran using bind threads. Not to mention, I should've refactored in smaller chunks. I did not realize I broke bind threads until your comment. Oops!

Reverting back and finding more issues
Before trying to debug my attempted refactor, I checked out 99c5abe to validate that all the tests passed, but they did not. I decided to revert the changes since 99c5abe at commit 0e9f95a. After undoing the refactor/breaking-changes, I made some follow up commits to improve guard rails for bind threads and fix existing bugs.

  1. Fixed test_deploy to handle RPYC_BIND_THREADS from envSshMachine was running w/ thread binding disabled while the tests had it enabled b/c environment variables are not passed by default.
  2. e473682 and 5f41958 are improvements around connection close timing for locks. While the fix being addressed in those commits cause tests to fail, the bug prevents the interpreter from closing indefinitely due to a blocking acquire. I can no longer reproduce the issue after those commits. However, improvements/refactoring are needed for closing connections and lock handling... being unable to reproduce a race condition does not mean it is fixed.
  3. I added another test step that runs all of the unit tests with RPYC_BIND_THREADS=true until we make it the default behavior.

TODOs/Take-aways

  • improvements/refactoring are needed for closing connections and lock handling
  • consider a better design pattern for AsyncResult wait (e.g., if bind threads behavior is assumed, does it make more sense to have a worker thread dedicated to send/recv that uses events compared to the current implementation of polling in AsyncResult... how much accidental complexity is there due to not using dedicated threads for channel communication? previously this was not done due to not knowing which thread a message was intended for).
  • Diagrams explaining RPyC data flow architecture as it relates to bind threads

Thanks for your time/contributions. I aim to make bind threads the default behavior eventually. Before it becomes the default behavior, I'd like to familiarize myself with it, optimize it, make it easier to follow, and document why/how it works.

@notEvil
Copy link
Author

notEvil commented Mar 20, 2023

Refactor rabbit hole and breaking changes

Very unfortunate

Before trying to debug my attempted refactor, I checked out 99c5abe to validate that all the tests passed, but they did not.

When I run the tests, I usually see some fails due to hardened SSH config on my system. So I might accidentally missed an issue or two (your 1. for instance). And sometimes the background threads don't shut down gracefully. Especially in the gevent test which leaves the test environment tainted for subsequent tests. That area definitely needs more thought!

consider a better design pattern for AsyncResult wait (e.g., if bind threads behavior is assumed, does it make more sense to have a worker thread dedicated to send/recv that uses events compared to the current implementation of polling in AsyncResult... how much accidental complexity is there due to not using dedicated threads for channel communication? previously this was not done due to not knowing which thread a message was intended for).

I don't think thats an option. A dedicated thread for communication would introduce thread switches which hurt performance. Also, there isn't that much added complexity I would say.

I aim to make bind threads the default behavior eventually. Before it becomes the default behavior, I'd like to familiarize myself with it, optimize it, make it easier to follow, and document why/how it works.

Great, if you need any information or find optimizations, let me know :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants