//! Audit logger //! //! Main entry point for audit logging. use std::sync::Arc; use tokio::sync::mpsc; use tracing::{debug, error, warn}; use crate::event::AuditEvent; use crate::sink::{AuditSink, Result}; /// Configuration for the audit logger #[derive(Debug, Clone)] pub struct AuditLoggerConfig { /// Buffer size for async logging pub buffer_size: usize, /// Whether to block when buffer is full pub block_on_full: bool, /// Whether to log errors to tracing pub log_errors: bool, } impl Default for AuditLoggerConfig { fn default() -> Self { Self { buffer_size: 1000, block_on_full: false, log_errors: true, } } } /// Audit logger /// /// Handles audit event logging with async processing. pub struct AuditLogger { sink: Arc, config: AuditLoggerConfig, } impl AuditLogger { /// Create a new audit logger with a sink pub fn new(sink: impl AuditSink + 'static) -> Self { Self { sink: Arc::new(sink), config: AuditLoggerConfig::default(), } } /// Create with configuration pub fn with_config(sink: impl AuditSink + 'static, config: AuditLoggerConfig) -> Self { Self { sink: Arc::new(sink), config, } } /// Create from an Arc'd sink pub fn from_arc(sink: Arc) -> Self { Self { sink, config: AuditLoggerConfig::default(), } } /// Log an audit event pub async fn log(&self, event: AuditEvent) -> Result<()> { if let Err(e) = self.sink.write(&event).await { if self.config.log_errors { error!( event_id = %event.id, error = %e, "Failed to write audit event" ); } return Err(e); } debug!( event_id = %event.id, kind = ?std::mem::discriminant(&event.kind), "Audit event logged" ); Ok(()) } /// Log an event without waiting for completion /// /// Returns immediately. Errors are logged via tracing. pub fn log_async(&self, event: AuditEvent) { let sink = self.sink.clone(); let log_errors = self.config.log_errors; tokio::spawn(async move { if let Err(e) = sink.write(&event).await { if log_errors { error!( event_id = %event.id, error = %e, "Failed to write audit event (async)" ); } } }); } /// Flush the sink pub async fn flush(&self) -> Result<()> { self.sink.flush().await } /// Close the logger pub async fn close(&self) -> Result<()> { self.sink.close().await } /// Get the underlying sink pub fn sink(&self) -> &Arc { &self.sink } } /// Buffered audit logger with async processing /// /// Uses a channel to buffer events and process them in the background. pub struct BufferedAuditLogger { tx: mpsc::Sender, config: AuditLoggerConfig, } impl BufferedAuditLogger { /// Create a new buffered logger /// /// Spawns a background task to process events. pub fn new(sink: impl AuditSink + 'static, config: AuditLoggerConfig) -> Self { let (tx, rx) = mpsc::channel(config.buffer_size); let sink = Arc::new(sink); // Spawn background processor tokio::spawn(Self::process_events(rx, sink, config.log_errors)); Self { tx, config } } /// Create with default config pub fn with_defaults(sink: impl AuditSink + 'static) -> Self { Self::new(sink, AuditLoggerConfig::default()) } /// Log an audit event /// /// If buffer is full, behavior depends on config.block_on_full pub async fn log(&self, event: AuditEvent) -> bool { if self.config.block_on_full { self.tx.send(event).await.is_ok() } else { match self.tx.try_send(event) { Ok(()) => true, Err(mpsc::error::TrySendError::Full(event)) => { if self.config.log_errors { warn!( event_id = %event.id, "Audit buffer full, event dropped" ); } false } Err(mpsc::error::TrySendError::Closed(_)) => { error!("Audit logger channel closed"); false } } } } /// Process events from the channel async fn process_events( mut rx: mpsc::Receiver, sink: Arc, log_errors: bool, ) { while let Some(event) = rx.recv().await { if let Err(e) = sink.write(&event).await { if log_errors { error!( event_id = %event.id, error = %e, "Failed to write audit event" ); } } } // Channel closed, flush and close sink if let Err(e) = sink.flush().await { if log_errors { error!(error = %e, "Failed to flush audit sink on close"); } } if let Err(e) = sink.close().await { if log_errors { error!(error = %e, "Failed to close audit sink"); } } } } #[cfg(test)] mod tests { use super::*; use crate::sink::MemorySink; #[tokio::test] async fn test_audit_logger() { let sink = Arc::new(MemorySink::new(100)); let logger = AuditLogger::from_arc(sink.clone()); let event = AuditEvent::authn_success("alice", "jwt"); logger.log(event).await.unwrap(); assert_eq!(sink.count().await, 1); } #[tokio::test] async fn test_audit_logger_async() { let sink = Arc::new(MemorySink::new(100)); let logger = AuditLogger::from_arc(sink.clone()); for i in 0..10 { let event = AuditEvent::authn_success(&format!("user-{}", i), "jwt"); logger.log_async(event); } // Give async tasks time to complete tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; assert_eq!(sink.count().await, 10); } #[tokio::test] async fn test_buffered_logger() { let sink = Arc::new(MemorySink::new(100)); let logger = BufferedAuditLogger::new( MemorySinkWrapper(sink.clone()), AuditLoggerConfig::default(), ); for i in 0..10 { let event = AuditEvent::authn_success(&format!("user-{}", i), "jwt"); logger.log(event).await; } // Give background task time to process tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; assert_eq!(sink.count().await, 10); } // Wrapper to allow Arc to be used as AuditSink struct MemorySinkWrapper(Arc); #[async_trait::async_trait] impl AuditSink for MemorySinkWrapper { async fn write(&self, event: &AuditEvent) -> Result<()> { self.0.write(event).await } async fn flush(&self) -> Result<()> { self.0.flush().await } async fn close(&self) -> Result<()> { self.0.close().await } } }