Skip to content

Commit

Permalink
[#45] Output pcap files from captures
Browse files Browse the repository at this point in the history
  • Loading branch information
msiodelski committed Mar 17, 2024
1 parent 5398401 commit acf373a
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 81 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ assert_cmd = "2.0.14"
assert_json = "0.1.0"
libc = "0.2.153"
predicates = "3.1.0"
tempdir = "0.3.7"

2 changes: 2 additions & 0 deletions endure-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ edition = "2021"

[dependencies]
async-trait = "0.1.77"
chrono = "0.4.35"
futures = "0.3.30"
pcap = { version = "1.2.0", features = ["capture-stream"] }
thiserror = "1.0.40"
tokio = { version = "1.36.0", features = ["full"] }

[dev-dependencies]
libc = "0.2.153"
predicates = "3.1.0"
194 changes: 133 additions & 61 deletions endure-lib/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@
//! or multiple interfaces and return the received packets over the common
//! channel to a caller.

use chrono::Local;
use futures::StreamExt;
use pcap::{Capture, Linktype, Packet, PacketCodec, PacketHeader, PacketStream};
use std::collections::HashMap;
use pcap::{Capture, Linktype, Packet, PacketCodec, PacketHeader, PacketStream, Savefile};
use std::marker::PhantomData;
use std::path::PathBuf;
use std::sync::Arc;
use std::{collections::HashMap, path::Path};
use thiserror::Error;
use tokio::{
sync::{
mpsc::{Receiver, Sender},
Mutex,
},
task::JoinHandle,
use tokio::sync::{
mpsc::{Receiver, Sender},
Mutex,
};

/// A default length of the Ethernet frame, IP and UDP headers together.
Expand Down Expand Up @@ -98,27 +97,47 @@ impl PacketWrapper {
/// analyzers.
struct PacketWrapperCodec {
filter: Option<Filter>,
data_link: Linktype,
datalink: Linktype,
savefile: Option<Savefile>,
}

impl PacketWrapperCodec {
fn new(filter: Option<Filter>, data_link: Linktype) -> Self {
fn new(datalink: Linktype) -> Self {
Self {
filter: filter,
data_link: data_link,
datalink,
filter: None,
savefile: None,
}
}

/// Sets filter for a codec.
///
/// If the filter is set, the codec includes it in the parsed packets.
fn with_filter(mut self, filter: Filter) -> Self {
self.filter = Some(filter);
self
}

/// Enables pcap generation from the encoded packets.
fn with_savefile(mut self, savefile: Savefile) -> Self {
self.savefile = Some(savefile);
self
}
}

impl PacketCodec for PacketWrapperCodec {
type Item = PacketWrapper;

/// Wraps the received packets in the [`PacketWrapper`].
fn decode(&mut self, packet: Packet) -> Self::Item {
if let Some(savefile) = &mut self.savefile {
savefile.write(&packet);
}
PacketWrapper {
header: packet.header.clone(),
data: packet.data.to_vec(),
filter: self.filter,
data_link: self.data_link,
data_link: self.datalink,
}
}
}
Expand Down Expand Up @@ -320,21 +339,17 @@ impl ListenerPool {
/// If there is another listener installed for this device already
/// it returns [`ListenerAddError::ListenerExists`] error.
///
/// The [`Filter`] applies filtering rules for packets capturing. For example,
/// it can be used to filter only BOOTP packets, only UDP packets, select
/// port number etc.
pub fn add_listener(
&mut self,
interface_name: &str,
filter: Filter,
) -> Result<(), ListenerAddError> {
if self.listeners.contains_key(interface_name) {
/// # Arguments
///
/// - `listener` - instance of the listener to be added to the pool.
pub fn add_listener(&mut self, listener: Listener<Inactive>) -> Result<(), ListenerAddError> {
if self.listeners.contains_key(&listener.interface_name) {
return Err(ListenerAddError::ListenerExists {
interface_name: interface_name.to_string(),
interface_name: listener.interface_name.to_string(),
});
}
let listener = Listener::from_iface(interface_name).with_filter(filter);
self.listeners.insert(interface_name.to_string(), listener);
self.listeners
.insert(listener.interface_name.to_string(), listener);
Ok(())
}

Expand Down Expand Up @@ -411,11 +426,9 @@ pub enum ListenerError {
/// There can be at most one listener instance for each interface.
#[derive(Debug)]
pub struct Listener<T: State> {
/// Name of the interface from which the packets are captured.
interface_name: String,
/// A filter to be used in packets capturing.
filter: Option<Filter>,
join_handle: Option<JoinHandle<Result<(), ListenerError>>>,
packet_filter: Option<Filter>,
pcap_dir: Option<String>,
_marker: PhantomData<T>,
}

Expand All @@ -429,8 +442,8 @@ impl Listener<Inactive> {
pub fn from_iface(interface_name: &str) -> Self {
Self {
interface_name: interface_name.to_string(),
filter: None,
join_handle: None,
packet_filter: None,
pcap_dir: None,
_marker: PhantomData,
}
}
Expand All @@ -441,13 +454,34 @@ impl Listener<Inactive> {
///
/// - `packet_filter` - a packet filter instance used for capturing
/// a specific type of the packets.
pub fn with_filter(self, packet_filter: Filter) -> Self {
Self {
interface_name: self.interface_name,
filter: Some(packet_filter),
join_handle: None,
_marker: PhantomData,
pub fn with_filter(mut self, packet_filter: Filter) -> Self {
self.packet_filter = Some(packet_filter);
self
}

/// Configures the listener to generate a pcap file from the stream.
///
/// # Arguments
///
/// - `pcap_dir` - path to a directory where pcap file should be saved.
pub fn save_to(mut self, pcap_dir: &str) -> Self {
self.pcap_dir = Some(pcap_dir.to_string());
self
}

/// Returns a path to a pcap file generated by the listener.
fn pcap_file(&self) -> Option<PathBuf> {
// Optionally create a pcap file from the stream.
if let Some(pcap_dir) = &self.pcap_dir {
let filename = format!(
"{}.{}.pcap",
self.interface_name,
Local::now().format("%Y-%m-%dT%H:%M:%S")
);
let path = Path::new(pcap_dir).join(filename);
return Some(path);
}
None
}

/// Runs a loop capturing packets from a stream.
Expand Down Expand Up @@ -475,8 +509,6 @@ impl Listener<Inactive> {
/// Starts the listener.
///
/// It applies the specified filter and spawns an asynchronous packet capture.
/// The returned [`Listener<Active>`] instance contains a join handle that can
/// be used to await the capture.
///
/// # Arguments
///
Expand All @@ -501,19 +533,28 @@ impl Listener<Inactive> {
.open()?
.setnonblock()?;

let datalink = capture.get_datalink();
let mut codec = PacketWrapperCodec::new(datalink);

// Optionally apply the filter.
if let Some(filter) = &self.filter {
if let Some(filter) = self.packet_filter {
capture.filter(filter.to_string().as_str(), false)?;
codec = codec.with_filter(filter);
}
let data_link = capture.get_datalink();
let stream = capture.stream(PacketWrapperCodec::new(self.filter.clone(), data_link))?;

// Optionally create a pcap file from the stream.
if let Some(pcap_file) = self.pcap_file() {
codec = codec.with_savefile(capture.savefile(pcap_file)?);
}

let stream = capture.stream(codec)?;

// Spawn the asynchronous capture.
let join_handle = Some(tokio::spawn(Self::capture_packets(stream, sender)));
let _ = tokio::spawn(Self::capture_packets(stream, sender));
Ok(Listener::<Active> {
interface_name: self.interface_name,
filter: self.filter,
join_handle,
packet_filter: self.packet_filter,
pcap_dir: self.pcap_dir,
_marker: PhantomData,
})
}
Expand All @@ -538,24 +579,17 @@ impl Listener<Inactive> {
}
}

impl Listener<Active> {
/// Returns a reference to the join handle for the active listener.
pub fn handle(&self) -> &JoinHandle<Result<(), ListenerError>> {
self.join_handle.as_ref().unwrap()
}
}
impl Listener<Active> {}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::{Listener, PacketWrapper, PacketWrapperCodec};
use crate::listener::{Filter, ListenerAddError, ListenerPool, PacketDataError, Proto};
use pcap::{Device, Linktype, Packet, PacketCodec, PacketHeader};
use predicates::prelude::*;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::listener::{Filter, ListenerAddError, ListenerPool, PacketDataError, Proto};

use super::{Listener, PacketWrapper, PacketWrapperCodec};

#[test]
fn filter_new() {
let filter = Filter::new();
Expand Down Expand Up @@ -756,7 +790,7 @@ mod tests {
data: data.as_ref(),
};
let mut codec =
PacketWrapperCodec::new(Some(Filter::new().bootp(67)), Linktype::ATM_RFC1483);
PacketWrapperCodec::new(Linktype::ATM_RFC1483).with_filter(Filter::new().bootp(67));
let packet_wrapper = codec.decode(packet);
assert!(packet_wrapper.filter.is_some());
assert_eq!("udp port 67", packet_wrapper.filter.unwrap().to_string());
Expand All @@ -772,12 +806,17 @@ mod tests {
fn add_listener() {
let mut listener_pool = ListenerPool::new();
let filter = Filter::new().udp();
assert_eq!(listener_pool.add_listener("lo", filter), Ok(()));
let listener = Listener::from_iface("lo").with_filter(filter);
assert!(listener_pool.add_listener(listener).is_ok());
assert!(matches!(
listener_pool.add_listener("lo", Filter::new()).unwrap_err(),
listener_pool
.add_listener(Listener::from_iface("lo"))
.unwrap_err(),
ListenerAddError::ListenerExists { .. }
));
assert_eq!(listener_pool.add_listener("lo0", Filter::new()), Ok(()));
assert!(listener_pool
.add_listener(Listener::from_iface("lo0"))
.is_ok());
}

#[tokio::test]
Expand All @@ -793,6 +832,39 @@ mod tests {
)
}

#[test]
fn pcap_filename() {
let listener = Listener::from_iface("enp0s3").save_to("/tmp/");
let pcap_file = listener.pcap_file();
assert!(pcap_file.is_some());
let pred = predicate::str::is_match(
"/tmp/enp0s3.\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.pcap",
)
.unwrap();
assert!(pred.eval(
pcap_file
.unwrap()
.as_os_str()
.to_os_string()
.to_str()
.unwrap()
));
}

#[test]
fn pcap_filename_unspecified() {
let listener = Listener::from_iface("enp0s3");
let pcap_file = listener.pcap_file();
assert!(pcap_file.is_none());
}

#[test]
fn pcap_filename_not_directory() {
let listener = Listener::from_iface("enp0s3").save_to("/tmp/foo.txt");
let pcap_file = listener.pcap_file();
assert!(pcap_file.is_some());
}

#[test]
#[ignore]
fn loopback_name() {
Expand Down
Loading

0 comments on commit acf373a

Please sign in to comment.