//! Watch functionality use crate::error::{ClientError, Result}; use chainfire_proto::proto::{ watch_client::WatchClient, watch_request, Event, WatchCreateRequest, WatchRequest, }; use futures::StreamExt; use tokio::sync::mpsc; use tonic::transport::Channel; use tracing::{debug, warn}; /// Event received from a watch #[derive(Debug, Clone)] pub struct WatchEvent { /// Event type (Put or Delete) pub event_type: EventType, /// Key that changed pub key: Vec, /// New value (for Put events) pub value: Vec, /// Revision of the change pub revision: u64, } /// Type of watch event #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum EventType { Put, Delete, } /// Handle to a watch stream pub struct WatchHandle { /// Watch ID watch_id: i64, /// Event receiver rx: mpsc::Receiver, } impl WatchHandle { /// Create a new watch pub(crate) async fn new( mut client: WatchClient, key: Vec, range_end: Option>, ) -> Result { let (tx, rx) = mpsc::channel(64); let (req_tx, req_rx) = mpsc::channel(16); // Send initial create request let create_req = WatchRequest { request_union: Some(watch_request::RequestUnion::CreateRequest( WatchCreateRequest { key, range_end: range_end.unwrap_or_default(), start_revision: 0, progress_notify: false, prev_kv: false, watch_id: 0, }, )), }; req_tx .send(create_req) .await .map_err(|_| ClientError::Watch("Failed to send create request".into()))?; // Create bidirectional stream let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx); let mut resp_stream = client.watch(req_stream).await?.into_inner(); // Wait for creation confirmation let first_resp = resp_stream .next() .await .ok_or_else(|| ClientError::Watch("No response from server".into()))? .map_err(ClientError::Rpc)?; if !first_resp.created { return Err(ClientError::Watch("Watch creation failed".into())); } let watch_id = first_resp.watch_id; debug!(watch_id, "Watch created"); // Spawn task to process events tokio::spawn(async move { while let Some(result) = resp_stream.next().await { match result { Ok(resp) => { if resp.canceled { debug!(watch_id = resp.watch_id, "Watch canceled"); break; } for event in resp.events { let watch_event = convert_event(event); if tx.send(watch_event).await.is_err() { break; } } } Err(e) => { warn!(error = %e, "Watch stream error"); break; } } } }); Ok(Self { watch_id, rx }) } /// Get the watch ID pub fn id(&self) -> i64 { self.watch_id } /// Receive the next event pub async fn recv(&mut self) -> Option { self.rx.recv().await } } fn convert_event(event: Event) -> WatchEvent { let event_type = if event.r#type == 0 { EventType::Put } else { EventType::Delete }; let (key, value, revision) = event.kv.map(|kv| { (kv.key, kv.value, kv.mod_revision as u64) }).unwrap_or_default(); WatchEvent { event_type, key, value, revision, } }