Skip to content

Commit

Permalink
Dynamic dispatch on socket type
Browse files Browse the repository at this point in the history
  • Loading branch information
connorslade committed Oct 31, 2023
1 parent 70dc0ab commit 25f71ca
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 10 deletions.
2 changes: 1 addition & 1 deletion lib/extensions/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct RangeResponse {
impl Middleware for Range {
// Inject the Accept-Ranges header into the response.
fn post(&self, req: &Request, res: &mut Response) -> MiddleResult {
if req.method != Method::GET || req.method != Method::GET {
if req.method != Method::GET {
return MiddleResult::Continue;
}

Expand Down
4 changes: 2 additions & 2 deletions lib/proto/websocket/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
};

use super::xor_mask;
use crate::trace::LazyFmt;
use crate::{trace::LazyFmt, socket::SocketStream};

/// ## Frame Layout
/// ```plain
Expand Down Expand Up @@ -126,7 +126,7 @@ impl Frame {
buf
}

pub fn write(&self, socket: &mut TcpStream) -> io::Result<()> {
pub fn write(&self, socket: &mut SocketStream) -> io::Result<()> {
let buf = self.to_bytes();
trace!(Level::Debug, "[WS] Writing: {:?}", buf);

Expand Down
4 changes: 2 additions & 2 deletions lib/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::consts;
use crate::header::{HeaderName, Headers};
use crate::internal::sync::ForceLockMutex;
use crate::proto::http::status::Status;
use crate::socket::Socket;
use crate::socket::{Socket, Stream, SocketStream};
use crate::{
error::Result, header::headers_to_string, internal::handle::Writeable, Content, Header,
SetCookie,
Expand Down Expand Up @@ -366,7 +366,7 @@ impl ResponseBody {

/// Writes a ResponseBody to a TcpStream.
/// Either in one go if it is static or in chunks if it is a stream.
fn write(&mut self, stream: &mut TcpStream) -> Result<()> {
fn write(&mut self, stream: &mut SocketStream) -> Result<()> {
match self {
ResponseBody::Empty => {}
ResponseBody::Static(data) => stream.write_all(data)?,
Expand Down
33 changes: 28 additions & 5 deletions lib/socket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
net::TcpStream,
io::{self, Read, Write},
net::{IpAddr, Shutdown, SocketAddr, TcpStream},
ops::Deref,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Expand All @@ -12,10 +13,32 @@ use crate::{
response::ResponseFlag,
};

pub type SocketStream = Box<dyn Stream + Send + Sync>;

pub trait Stream: Read + Write {
fn peer_addr(&self) -> io::Result<SocketAddr>;
fn try_clone(&self) -> io::Result<SocketStream>;
fn shutdown(&self, shutdown: Shutdown) -> io::Result<()>;
}

impl Stream for TcpStream {
fn peer_addr(&self) -> io::Result<SocketAddr> {
self.peer_addr()
}

fn try_clone(&self) -> io::Result<SocketStream> {
Ok(self.try_clone().map(Box::new)?)
}

fn shutdown(&self, shutdown: Shutdown) -> io::Result<()> {
self.shutdown(shutdown)
}
}

/// Socket is a wrapper around TcpStream that allows for sending a response from other threads.
pub struct Socket {
/// The internal TcpStream.
pub socket: Mutex<TcpStream>,
pub socket: Mutex<Box<dyn Stream + Send + Sync>>,
/// A unique identifier that uniquely identifies this socket.
pub id: u64,
/// A barrier that is used to wait for the response to be sent in the case of a guaranteed send.
Expand All @@ -32,10 +55,10 @@ pub struct Socket {
impl Socket {
/// Create a new `Socket` from a `TcpStream`.
/// Will also create a new unique identifier for the socket.
pub(crate) fn new(socket: TcpStream) -> Self {
pub(crate) fn new(socket: impl Stream + Send + Sync + 'static) -> Self {
static ID: AtomicU64 = AtomicU64::new(0);
Self {
socket: Mutex::new(socket),
socket: Mutex::new(Box::new(socket)),
id: ID.fetch_add(1, Ordering::Relaxed),
barrier: Arc::new(SingleBarrier::new()),
raw: AtomicBool::new(false),
Expand Down Expand Up @@ -76,7 +99,7 @@ impl Socket {
}

impl Deref for Socket {
type Target = Mutex<TcpStream>;
type Target = Mutex<Box<dyn Stream + Send + Sync>>;

fn deref(&self) -> &Self::Target {
&self.socket
Expand Down

0 comments on commit 25f71ca

Please sign in to comment.