diff --git a/Cargo.toml b/Cargo.toml index 902fb44..5b01a7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,4 +31,5 @@ assert_cmd = "2.0.14" assert_json = "0.1.0" libc = "0.2.153" predicates = "3.1.0" +tempdir = "0.3.7" diff --git a/endure-lib/Cargo.toml b/endure-lib/Cargo.toml index e0ed3c7..2bce078 100644 --- a/endure-lib/Cargo.toml +++ b/endure-lib/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] async-trait = "0.1.77" +chrono = "0.4.35" futures = "0.3.30" pcap = { version = "1.2.0", features = ["capture-stream"] } thiserror = "1.0.40" @@ -15,3 +16,4 @@ tokio = { version = "1.36.0", features = ["full"] } [dev-dependencies] libc = "0.2.153" +predicates = "3.1.0" diff --git a/endure-lib/src/listener.rs b/endure-lib/src/listener.rs index 09bf3c1..09398e6 100644 --- a/endure-lib/src/listener.rs +++ b/endure-lib/src/listener.rs @@ -2,18 +2,17 @@ //! or multiple interfaces and return the received packets over the common //! channel to a caller. +use chrono::Local; use futures::StreamExt; -use pcap::{Capture, Linktype, Packet, PacketCodec, PacketHeader, PacketStream}; -use std::collections::HashMap; +use pcap::{Capture, Linktype, Packet, PacketCodec, PacketHeader, PacketStream, Savefile}; use std::marker::PhantomData; +use std::path::PathBuf; use std::sync::Arc; +use std::{collections::HashMap, path::Path}; use thiserror::Error; -use tokio::{ - sync::{ - mpsc::{Receiver, Sender}, - Mutex, - }, - task::JoinHandle, +use tokio::sync::{ + mpsc::{Receiver, Sender}, + Mutex, }; /// A default length of the Ethernet frame, IP and UDP headers together. @@ -98,27 +97,47 @@ impl PacketWrapper { /// analyzers. struct PacketWrapperCodec { filter: Option, - data_link: Linktype, + datalink: Linktype, + savefile: Option, } impl PacketWrapperCodec { - fn new(filter: Option, data_link: Linktype) -> Self { + fn new(datalink: Linktype) -> Self { Self { - filter: filter, - data_link: data_link, + datalink, + filter: None, + savefile: None, } } + + /// Sets filter for a codec. + /// + /// If the filter is set, the codec includes it in the parsed packets. + fn with_filter(mut self, filter: Filter) -> Self { + self.filter = Some(filter); + self + } + + /// Enables pcap generation from the encoded packets. + fn with_savefile(mut self, savefile: Savefile) -> Self { + self.savefile = Some(savefile); + self + } } impl PacketCodec for PacketWrapperCodec { type Item = PacketWrapper; + /// Wraps the received packets in the [`PacketWrapper`]. fn decode(&mut self, packet: Packet) -> Self::Item { + if let Some(savefile) = &mut self.savefile { + savefile.write(&packet); + } PacketWrapper { header: packet.header.clone(), data: packet.data.to_vec(), filter: self.filter, - data_link: self.data_link, + data_link: self.datalink, } } } @@ -320,21 +339,17 @@ impl ListenerPool { /// If there is another listener installed for this device already /// it returns [`ListenerAddError::ListenerExists`] error. /// - /// The [`Filter`] applies filtering rules for packets capturing. For example, - /// it can be used to filter only BOOTP packets, only UDP packets, select - /// port number etc. - pub fn add_listener( - &mut self, - interface_name: &str, - filter: Filter, - ) -> Result<(), ListenerAddError> { - if self.listeners.contains_key(interface_name) { + /// # Arguments + /// + /// - `listener` - instance of the listener to be added to the pool. + pub fn add_listener(&mut self, listener: Listener) -> Result<(), ListenerAddError> { + if self.listeners.contains_key(&listener.interface_name) { return Err(ListenerAddError::ListenerExists { - interface_name: interface_name.to_string(), + interface_name: listener.interface_name.to_string(), }); } - let listener = Listener::from_iface(interface_name).with_filter(filter); - self.listeners.insert(interface_name.to_string(), listener); + self.listeners + .insert(listener.interface_name.to_string(), listener); Ok(()) } @@ -411,11 +426,9 @@ pub enum ListenerError { /// There can be at most one listener instance for each interface. #[derive(Debug)] pub struct Listener { - /// Name of the interface from which the packets are captured. interface_name: String, - /// A filter to be used in packets capturing. - filter: Option, - join_handle: Option>>, + packet_filter: Option, + pcap_dir: Option, _marker: PhantomData, } @@ -429,8 +442,8 @@ impl Listener { pub fn from_iface(interface_name: &str) -> Self { Self { interface_name: interface_name.to_string(), - filter: None, - join_handle: None, + packet_filter: None, + pcap_dir: None, _marker: PhantomData, } } @@ -441,13 +454,34 @@ impl Listener { /// /// - `packet_filter` - a packet filter instance used for capturing /// a specific type of the packets. - pub fn with_filter(self, packet_filter: Filter) -> Self { - Self { - interface_name: self.interface_name, - filter: Some(packet_filter), - join_handle: None, - _marker: PhantomData, + pub fn with_filter(mut self, packet_filter: Filter) -> Self { + self.packet_filter = Some(packet_filter); + self + } + + /// Configures the listener to generate a pcap file from the stream. + /// + /// # Arguments + /// + /// - `pcap_dir` - path to a directory where pcap file should be saved. + pub fn save_to(mut self, pcap_dir: &str) -> Self { + self.pcap_dir = Some(pcap_dir.to_string()); + self + } + + /// Returns a path to a pcap file generated by the listener. + fn pcap_file(&self) -> Option { + // Optionally create a pcap file from the stream. + if let Some(pcap_dir) = &self.pcap_dir { + let filename = format!( + "{}.{}.pcap", + self.interface_name, + Local::now().format("%Y-%m-%dT%H:%M:%S") + ); + let path = Path::new(pcap_dir).join(filename); + return Some(path); } + None } /// Runs a loop capturing packets from a stream. @@ -475,8 +509,6 @@ impl Listener { /// Starts the listener. /// /// It applies the specified filter and spawns an asynchronous packet capture. - /// The returned [`Listener`] instance contains a join handle that can - /// be used to await the capture. /// /// # Arguments /// @@ -501,19 +533,28 @@ impl Listener { .open()? .setnonblock()?; + let datalink = capture.get_datalink(); + let mut codec = PacketWrapperCodec::new(datalink); + // Optionally apply the filter. - if let Some(filter) = &self.filter { + if let Some(filter) = self.packet_filter { capture.filter(filter.to_string().as_str(), false)?; + codec = codec.with_filter(filter); } - let data_link = capture.get_datalink(); - let stream = capture.stream(PacketWrapperCodec::new(self.filter.clone(), data_link))?; + + // Optionally create a pcap file from the stream. + if let Some(pcap_file) = self.pcap_file() { + codec = codec.with_savefile(capture.savefile(pcap_file)?); + } + + let stream = capture.stream(codec)?; // Spawn the asynchronous capture. - let join_handle = Some(tokio::spawn(Self::capture_packets(stream, sender))); + let _ = tokio::spawn(Self::capture_packets(stream, sender)); Ok(Listener:: { interface_name: self.interface_name, - filter: self.filter, - join_handle, + packet_filter: self.packet_filter, + pcap_dir: self.pcap_dir, _marker: PhantomData, }) } @@ -538,24 +579,17 @@ impl Listener { } } -impl Listener { - /// Returns a reference to the join handle for the active listener. - pub fn handle(&self) -> &JoinHandle> { - self.join_handle.as_ref().unwrap() - } -} +impl Listener {} #[cfg(test)] mod tests { - use std::sync::Arc; - + use super::{Listener, PacketWrapper, PacketWrapperCodec}; + use crate::listener::{Filter, ListenerAddError, ListenerPool, PacketDataError, Proto}; use pcap::{Device, Linktype, Packet, PacketCodec, PacketHeader}; + use predicates::prelude::*; + use std::sync::Arc; use tokio::sync::Mutex; - use crate::listener::{Filter, ListenerAddError, ListenerPool, PacketDataError, Proto}; - - use super::{Listener, PacketWrapper, PacketWrapperCodec}; - #[test] fn filter_new() { let filter = Filter::new(); @@ -756,7 +790,7 @@ mod tests { data: data.as_ref(), }; let mut codec = - PacketWrapperCodec::new(Some(Filter::new().bootp(67)), Linktype::ATM_RFC1483); + PacketWrapperCodec::new(Linktype::ATM_RFC1483).with_filter(Filter::new().bootp(67)); let packet_wrapper = codec.decode(packet); assert!(packet_wrapper.filter.is_some()); assert_eq!("udp port 67", packet_wrapper.filter.unwrap().to_string()); @@ -772,12 +806,17 @@ mod tests { fn add_listener() { let mut listener_pool = ListenerPool::new(); let filter = Filter::new().udp(); - assert_eq!(listener_pool.add_listener("lo", filter), Ok(())); + let listener = Listener::from_iface("lo").with_filter(filter); + assert!(listener_pool.add_listener(listener).is_ok()); assert!(matches!( - listener_pool.add_listener("lo", Filter::new()).unwrap_err(), + listener_pool + .add_listener(Listener::from_iface("lo")) + .unwrap_err(), ListenerAddError::ListenerExists { .. } )); - assert_eq!(listener_pool.add_listener("lo0", Filter::new()), Ok(())); + assert!(listener_pool + .add_listener(Listener::from_iface("lo0")) + .is_ok()); } #[tokio::test] @@ -793,6 +832,39 @@ mod tests { ) } + #[test] + fn pcap_filename() { + let listener = Listener::from_iface("enp0s3").save_to("/tmp/"); + let pcap_file = listener.pcap_file(); + assert!(pcap_file.is_some()); + let pred = predicate::str::is_match( + "/tmp/enp0s3.\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.pcap", + ) + .unwrap(); + assert!(pred.eval( + pcap_file + .unwrap() + .as_os_str() + .to_os_string() + .to_str() + .unwrap() + )); + } + + #[test] + fn pcap_filename_unspecified() { + let listener = Listener::from_iface("enp0s3"); + let pcap_file = listener.pcap_file(); + assert!(pcap_file.is_none()); + } + + #[test] + fn pcap_filename_not_directory() { + let listener = Listener::from_iface("enp0s3").save_to("/tmp/foo.txt"); + let pcap_file = listener.pcap_file(); + assert!(pcap_file.is_some()); + } + #[test] #[ignore] fn loopback_name() { diff --git a/src/cli.rs b/src/cli.rs index f1968ee..3663f4e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -13,10 +13,10 @@ //! Cli::parse().run(); //! ``` -use std::process::exit; +use std::{path::Path, process::exit}; use crate::dispatcher::{self, CsvOutputType}; -use endure_lib::listener::{self, Filter}; +use endure_lib::listener::{self, Filter, Listener}; use clap::{Args, Parser, Subcommand}; use futures::executor::block_on; @@ -33,6 +33,41 @@ pub struct Cli { commands: Option, } +/// A parser checking if the specified path is a directory path. +fn directory_path_parser(path: &str) -> Result { + let path = Path::new(path); + if !path.exists() { + return Err(format!("directory {:?} does not exist", path)); + } + if !path.is_dir() { + return Err(format!("{:?} is not a directory", path)); + } + return Ok(path.as_os_str().to_str().unwrap().to_string()); +} + +/// A parser checking if the path of the specified file exists. +/// +/// It excludes the keyword `stdout` from validation. This keyword +/// is used to specify the console as an output for CSV reports. +/// +/// # Result +/// +/// It returns an error when the directory of the specified file does +/// not exist. It doesn't run any checks when the specified value is +/// a `stdout` keyword. +fn directory_path_file_parser(path: &str) -> Result { + if path.eq("stdout") { + return Ok(path.to_string()); + } + let path = Path::new(path); + if let Some(parent) = path.parent() { + if !parent.as_os_str().is_empty() && !parent.exists() { + return Err(format!("directory {:?} does not exist", parent)); + } + } + return Ok(path.as_os_str().to_str().unwrap().to_string()); +} + /// An enum that defines the supported subcommands. #[derive(Subcommand)] enum Commands { @@ -50,6 +85,9 @@ enum Commands { report_interval: u64, #[command(flatten)] reporting: ReportingArgs, + /// Specifies a location of the directory where pcap files are saved. + #[arg(short, long, value_parser = directory_path_parser)] + pcap_directory: Option, }, } @@ -73,7 +111,7 @@ struct InterfaceArgs { struct ReportingArgs { /// File location where the metrics should be periodically written in the CSV format. /// Use stdout to write the metrics to the console. - #[arg(short, long)] + #[arg(short, long, value_parser = directory_path_file_parser)] csv_output: Option, /// Enables the metrics export to Prometheus via the [http-address]/metrics endpoint. #[arg(long, action)] @@ -117,6 +155,7 @@ impl Cli { api, sse, }, + pcap_directory, } => { // Check if the loopback interface has been explicitly. if loopback { @@ -130,7 +169,13 @@ impl Cli { let filter = Filter::new().bootp_server_relay(); // Bind to the specified interfaces. for interface_name in interface_names.iter() { - let result = dispatcher.add_listener(interface_name.as_str(), filter); + let mut listener = + Listener::from_iface(interface_name.as_str()).with_filter(filter); + // Optionally enable saving pcap files. + if let Some(pcap_directory) = pcap_directory.clone() { + listener = listener.save_to(pcap_directory.as_str()); + } + let result = dispatcher.add_listener(listener); if let Some(err) = result.err() { eprintln!("{}", err.to_string()); exit(128); diff --git a/src/dispatcher.rs b/src/dispatcher.rs index 03b8ae9..8669242 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -36,7 +36,7 @@ use crate::{ analyzer::Analyzer, sse::{self, Event, EventGateway}, }; -use endure_lib::listener::{self, Filter}; +use endure_lib::listener::{self, Inactive, Listener}; /// An enum of errors returned by the [`Dispatcher::dispatch`] #[derive(Debug, Error)] @@ -184,15 +184,14 @@ impl Dispatcher { /// If there is another listener installed for this device already /// it returns [`listener::ListenerAddError::ListenerExists`] error. /// - /// The [`Filter`] applies filtering rules for packets capturing. For example, - /// it can be used to filter only BOOTP packets, only UDP packets, select - /// port number etc. + /// # Arguments + /// + /// - `listener` - instance of the listener to be added. pub fn add_listener( &mut self, - interface_name: &str, - filter: Filter, + listener: Listener, ) -> Result<(), listener::ListenerAddError> { - self.listener_pool.add_listener(interface_name, filter) + self.listener_pool.add_listener(listener) } /// Starts an HTTP server enabling an endpoint for Prometheus. @@ -398,7 +397,7 @@ mod tests { use crate::dispatcher::DispatchError::{CsvWriterError, HttpServerError}; use crate::dispatcher::Dispatcher; use crate::dispatcher::{CsvOutputType, RegistryWrapper}; - use endure_lib::listener::{self, Filter}; + use endure_lib::listener::{self, Filter, Listener}; trait BodyTest { fn as_str(&self) -> &str; @@ -422,12 +421,18 @@ mod tests { fn add_listener() { let mut dispatcher = Dispatcher::new(); let filter = Filter::new().udp(); - assert_eq!(dispatcher.add_listener("lo", filter), Ok(())); + assert!(dispatcher + .add_listener(Listener::from_iface("lo").with_filter(filter)) + .is_ok()); assert!(matches!( - dispatcher.add_listener("lo", Filter::new()).unwrap_err(), + dispatcher + .add_listener(Listener::from_iface("lo").with_filter(Filter::new())) + .unwrap_err(), listener::ListenerAddError::ListenerExists { .. } )); - assert_eq!(dispatcher.add_listener("lo0", Filter::new()), Ok(())); + assert!(dispatcher + .add_listener(Listener::from_iface("lo0").with_filter(Filter::new())) + .is_ok()); } #[tokio::test] diff --git a/tests/cli.rs b/tests/cli.rs index 1d1d628..73fa5cd 100644 --- a/tests/cli.rs +++ b/tests/cli.rs @@ -1,6 +1,7 @@ use assert_cmd::prelude::*; use predicates::prelude::*; -use std::process::Command; +use std::{fs::File, process::Command}; +use tempdir::TempDir; #[test] fn cli_interface_name_unspecified() -> Result<(), Box> { @@ -164,15 +165,46 @@ fn cli_zero_report_interval() -> Result<(), Box> { } #[test] -fn cli_csv_output_non_existing() -> Result<(), Box> { +fn cli_csv_output_directory_non_existing() -> Result<(), Box> { let mut cmd = Command::cargo_bin("endure")?; cmd.arg("collect") .arg("-i") .arg("foo") .arg("-c") - .arg("/tmp/non-existing/file"); + .arg("/tmp/non-existing/file.csv"); cmd.assert().failure().stderr(predicate::str::contains( - "failed to open the \"/tmp/non-existing/file\" file for writing: \"No such file or directory (os error 2)\"" - )); + "directory \"/tmp/non-existing\" does not exist", + )); + Ok(()) +} + +#[test] +fn cli_pcap_directory_non_existing() -> Result<(), Box> { + let mut cmd = Command::cargo_bin("endure")?; + cmd.arg("collect") + .arg("-i") + .arg("foo") + .arg("-p") + .arg("/tmp/non-existing/endure/directory"); + cmd.assert().failure().stderr(predicate::str::contains( + "directory \"/tmp/non-existing/endure/directory\" does not exist", + )); + Ok(()) +} + +#[test] +fn cli_directory_path_not_directory() -> Result<(), Box> { + let dir = TempDir::new("test")?; + let file_path = dir.path().join("tcp.pcap"); + let _ = File::create(&file_path)?; + let mut cmd = Command::cargo_bin("endure")?; + cmd.arg("collect") + .arg("-i") + .arg("foo") + .arg("-p") + .arg(file_path); + cmd.assert() + .failure() + .stderr(predicate::str::contains("tcp.pcap\" is not a directory")); Ok(()) }