Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Never block in try_recv #1000

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 84 additions & 1 deletion crossbeam-channel/src/flavors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,89 @@ impl<T> Channel<T> {
}
}

/// Attempts to reserve a slot for receiving a message.
///
/// As opposed to `start_recv`, this method will never block even if it encounters incomplete
/// send operations and thus is not linearizable, so should not be used for parking.
fn try_start_recv(&self, token: &mut Token) -> bool {
let backoff = Backoff::new();
let mut head = self.head.index.load(Ordering::Acquire);
let mut block = self.head.block.load(Ordering::Acquire);

loop {
// Calculate the offset of the index into the block.
let offset = (head >> SHIFT) % LAP;

// We reached the end of the block but the block is not installed yet,
// so the send has not completed.
if offset == BLOCK_CAP {
return false;
}

let mut new_head = head + (1 << SHIFT);

if new_head & MARK_BIT == 0 {
atomic::fence(Ordering::SeqCst);
let tail = self.tail.index.load(Ordering::Relaxed);

// If the tail equals the head, that means the channel is empty.
if head >> SHIFT == tail >> SHIFT {
// If the channel is disconnected...
if tail & MARK_BIT != 0 {
// ...then receive an error.
token.list.block = ptr::null();
return true;
} else {
// Otherwise, the receive operation is not ready.
return false;
}
}

// If head and tail are not in the same block, set `MARK_BIT` in head.
if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
new_head |= MARK_BIT;
}
}

// The block can be null here only if the first message is being sent into the channel,
// so the send has not completed yet.
if block.is_null() {
return false;
}

// Try moving the head index forward.
match self.head.index.compare_exchange_weak(
head,
new_head,
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(_) => unsafe {
// If we've reached the end of the block, move to the next one.
if offset + 1 == BLOCK_CAP {
let next = (*block).wait_next();
let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
if !(*next).next.load(Ordering::Relaxed).is_null() {
next_index |= MARK_BIT;
}

self.head.block.store(next, Ordering::Release);
self.head.index.store(next_index, Ordering::Release);
}

token.list.block = block as *const u8;
token.list.offset = offset;
return true;
},
Err(h) => {
head = h;
block = self.head.block.load(Ordering::Acquire);
backoff.spin();
}
}
}
}

/// Reads a message from the channel.
pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
if token.list.block.is_null() {
Expand Down Expand Up @@ -432,7 +515,7 @@ impl<T> Channel<T> {
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();

if self.start_recv(token) {
if self.try_start_recv(token) {
unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
} else {
Err(TryRecvError::Empty)
Expand Down
Loading