Skip to content

Commit

Permalink
Thread pool documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
connorslade committed Dec 20, 2023
1 parent 0699b55 commit a4fabce
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 4 deletions.
12 changes: 11 additions & 1 deletion lib/internal/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@ pub(crate) type Writeable = Box<RefCell<dyn Read + Send>>;

/// Handles a socket.
///
/// <https://open.spotify.com/track/50txng2W8C9SycOXKIQP0D>
/// This process consists of:
/// - Parsing the request
/// - Running pre and post middleware
/// - Finding and running the correct route handler
/// - Error handling
/// - etc.
///
/// For further information, check the source code.
/// This is the internal module after all, so don't expect me to keep good documentation.
///
/// [handle me by Sophie Cates on Spotify](https://open.spotify.com/track/50txng2W8C9SycOXKIQP0D)
pub fn handle<State>(stream: Arc<Socket>, this: Arc<Server<State>>)
where
State: 'static + Send + Sync,
Expand Down
42 changes: 39 additions & 3 deletions lib/internal/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
//! To execute a job on the thread pool, use [`ThreadPool::execute`].
//! To resize the thread pool, there are a few different functions:
//! - [`ThreadPool::resize_exact`] - Resizes the thread pool to the specified size.
//! - [`ThreadPool::increase`] - Increases the thread pool size by 1.
//! - [`ThreadPool::decrease`] - Decreases the thread pool size by 1.
//! - [`ThreadPool::increase`] - Spawns a new worker thread, increasing the thread pool size by 1.
//! - [`ThreadPool::decrease`] - Sends a kill message, decreasing the thread pool size by 1.
//!
//! For more information on how the thread pool works, see the documentation for [`ThreadPool`].

use std::{
panic,
Expand All @@ -26,6 +27,25 @@ use crate::{
};

/// A thread pool.
///
/// Consists of a number of worker threads, and a channel to send messages to them.
/// When dropping the thread pool, all workers are stopped and joined.
/// When increasing the size of the thread pool, new workers are spawned.
/// When decreasing the size of the thread pool, a kill message is sent on the channel, and when processed, the worker removes itself from the pool.
///
/// Also note each worker has a unique id, which is calculated by incrementing a static counter.
/// This means that even when a worker is removed, the id will not be reused, until the counter overflows I suppose.
///
/// # Example
/// ```
/// # use afire::internal::thread_pool::ThreadPool;
/// let pool = ThreadPool::new_empty();
///
/// pool.increase();
/// pool.execute(|| {
/// println!("Hello from thread pool!");
/// });
/// ```
pub struct ThreadPool {
/// Handle to each worker thread.
workers: Workers,
Expand Down Expand Up @@ -80,6 +100,7 @@ impl ThreadPool {
}
}

/// Create a new empty thread pool with zero threads.
pub fn new_empty() -> Self {
let (sender, rx) = mpsc::channel();
Self {
Expand All @@ -96,17 +117,28 @@ impl ThreadPool {
self.sender.force_lock().send(job).unwrap();
}

/// Returns the number of threads that should be in the pool.
/// This is not necessarily the number of threads that are in the pool as when resizing, the threads are not immediately removed but the count is immediately updated.
pub fn threads(&self) -> usize {
self.threads.load(Ordering::Relaxed)
}

/// Returns the number of threads that are in the pool.
/// This is more accurate than [`ThreadPool::threads`] as it does not update the count until the threads are actually removed.
/// But it is also slower as it locks the workers mutex to count the threads.
pub fn threads_exact(&self) -> usize {
self.workers.inner.force_lock().len()
}

/// Returns the index of the thread calling this function.
/// Returns `None` if the thread is not a worker thread.
/// Returns `None` if the thread is not a worker thread of this thread pool.
pub fn current_thread(&self) -> Option<usize> {
let thread = thread::current();
self.workers.find(thread.id())
}

/// Resizes the thread pool to the specified size.
/// Depending on how the size changes, the [`ThreadPool::increase`] or [`ThreadPool::decrease`] functions are repeatedly called to resize the pool.
pub fn resize_exact(&self, size: usize) {
assert!(size > 0);
trace!(Level::Debug, "Resizing thread pool to {}", size);
Expand All @@ -129,13 +161,16 @@ impl ThreadPool {
}
}

/// Spawns a new worker thread, increasing the thread pool size by 1.
pub fn increase(&self) {
trace!(Level::Debug, "Increasing thread pool size by 1");
self.workers
.push(Worker::new(self.receiver.clone(), self.workers.clone()));
self.threads.fetch_add(1, Ordering::Relaxed);
}

/// Sends a kill message to a worker thread, decreasing the thread pool size by 1.
/// If all workers are busy, this will not force a worker to stop,
pub fn decrease(&self) {
trace!(Level::Debug, "Decreasing thread pool size by 1");
let sender = self.sender.force_lock();
Expand All @@ -156,6 +191,7 @@ impl Workers {
}

fn remove(&self, id: usize) -> Option<()> {
trace!(Level::Debug, "Worker thread #{id} killed");
let mut list = self.inner.force_lock();
let idx = list.iter().position(|x| x.id == id)?;
list.remove(idx);
Expand Down

0 comments on commit a4fabce

Please sign in to comment.