//! Node client for communicating with storage nodes use super::{NodeError, NodeResult}; use async_trait::async_trait; use bytes::Bytes; use lightningstor_node::proto::{ ChunkExistsRequest, ChunkSizeRequest, DeleteChunkRequest, GetChunkRequest, PingRequest, PutChunkRequest, }; use lightningstor_node::NodeServiceClient; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::sync::RwLock; use tonic::transport::Channel; /// Trait for storage node client operations #[async_trait] pub trait NodeClientTrait: Send + Sync { /// Get the node ID fn node_id(&self) -> &str; /// Get the node endpoint fn endpoint(&self) -> &str; /// Check if the node is currently considered healthy async fn is_healthy(&self) -> bool; /// Store a chunk on this node async fn put_chunk( &self, chunk_id: &str, shard_index: u32, is_parity: bool, data: Bytes, ) -> NodeResult<()>; /// Retrieve a chunk from this node async fn get_chunk( &self, chunk_id: &str, shard_index: u32, is_parity: bool, ) -> NodeResult>; /// Delete a chunk from this node async fn delete_chunk(&self, chunk_id: &str) -> NodeResult<()>; /// Check if a chunk exists on this node async fn chunk_exists(&self, chunk_id: &str) -> NodeResult; /// Get the size of a chunk on this node async fn chunk_size(&self, chunk_id: &str) -> NodeResult>; /// Ping the node to check connectivity async fn ping(&self) -> NodeResult; } /// Real gRPC client for storage nodes /// /// This client communicates with storage nodes over gRPC. /// For now, this is a placeholder that will be implemented /// when the storage node service is created. pub struct NodeClient { node_id: String, endpoint: String, healthy: AtomicBool, client: RwLock>, } impl NodeClient { /// Connect to a storage node at the given endpoint pub async fn connect(endpoint: &str) -> NodeResult { // Ensure endpoint has scheme let endpoint_url = if endpoint.contains("://") { endpoint.to_string() } else { format!("http://{}", endpoint) }; let channel = Channel::from_shared(endpoint_url.clone()) .map_err(|e| NodeError::ConnectionFailed { node_id: "unknown".to_string(), reason: e.to_string(), })? .connect_timeout(Duration::from_secs(5)) .connect() .await .map_err(|e| NodeError::ConnectionFailed { node_id: "unknown".to_string(), reason: e.to_string(), })?; let client = NodeServiceClient::new(channel); // Try to get node status to get the real node ID // If that fails, generate a temporary one based on endpoint, but connection is established let node_id = match client.clone().get_status(lightningstor_node::proto::GetStatusRequest {}).await { Ok(response) => response.into_inner().node_id, Err(_) => format!("node-{}", endpoint.replace([':', '.', '/'], "-")), }; Ok(Self { node_id, endpoint: endpoint.to_string(), healthy: AtomicBool::new(true), client: RwLock::new(client), }) } /// Create a client with a specific node ID pub async fn connect_with_id(node_id: &str, endpoint: &str) -> NodeResult { let endpoint_url = if endpoint.contains("://") { endpoint.to_string() } else { format!("http://{}", endpoint) }; // We use lazy connection here to not block startup if a node is temporarily down let channel = Channel::from_shared(endpoint_url.clone()) .map_err(|e| NodeError::ConnectionFailed { node_id: node_id.to_string(), reason: e.to_string(), })? .connect_timeout(Duration::from_secs(5)) .connect_lazy(); let client = NodeServiceClient::new(channel); Ok(Self { node_id: node_id.to_string(), endpoint: endpoint.to_string(), healthy: AtomicBool::new(true), client: RwLock::new(client), }) } /// Mark the node as unhealthy pub fn mark_unhealthy(&self) { self.healthy.store(false, Ordering::SeqCst); } /// Mark the node as healthy pub fn mark_healthy(&self) { self.healthy.store(true, Ordering::SeqCst); } } #[async_trait] impl NodeClientTrait for NodeClient { fn node_id(&self) -> &str { &self.node_id } fn endpoint(&self) -> &str { &self.endpoint } async fn is_healthy(&self) -> bool { self.healthy.load(Ordering::SeqCst) } async fn put_chunk( &self, chunk_id: &str, shard_index: u32, is_parity: bool, data: Bytes, ) -> NodeResult<()> { if !self.is_healthy().await { return Err(NodeError::Unhealthy(self.node_id.clone())); } let request = PutChunkRequest { chunk_id: chunk_id.to_string(), shard_index, is_parity, data: data.to_vec(), }; let mut client = self.client.write().await; client .put_chunk(request) .await .map(|_| ()) .map_err(|e| NodeError::RpcFailed(e.to_string())) } async fn get_chunk( &self, chunk_id: &str, shard_index: u32, is_parity: bool, ) -> NodeResult> { if !self.is_healthy().await { return Err(NodeError::Unhealthy(self.node_id.clone())); } let request = GetChunkRequest { chunk_id: chunk_id.to_string(), shard_index, is_parity, }; let mut client = self.client.write().await; let response = client .get_chunk(request) .await .map_err(|e| match e.code() { tonic::Code::NotFound => NodeError::NotFound(chunk_id.to_string()), _ => NodeError::RpcFailed(e.to_string()), })?; Ok(response.into_inner().data) } async fn delete_chunk(&self, chunk_id: &str) -> NodeResult<()> { if !self.is_healthy().await { return Err(NodeError::Unhealthy(self.node_id.clone())); } let request = DeleteChunkRequest { chunk_id: chunk_id.to_string(), }; let mut client = self.client.write().await; client .delete_chunk(request) .await .map(|_| ()) .map_err(|e| NodeError::RpcFailed(e.to_string())) } async fn chunk_exists(&self, chunk_id: &str) -> NodeResult { if !self.is_healthy().await { return Err(NodeError::Unhealthy(self.node_id.clone())); } let request = ChunkExistsRequest { chunk_id: chunk_id.to_string(), }; let mut client = self.client.write().await; let response = client .chunk_exists(request) .await .map_err(|e| NodeError::RpcFailed(e.to_string()))?; Ok(response.into_inner().exists) } async fn chunk_size(&self, chunk_id: &str) -> NodeResult> { if !self.is_healthy().await { return Err(NodeError::Unhealthy(self.node_id.clone())); } let request = ChunkSizeRequest { chunk_id: chunk_id.to_string(), }; let mut client = self.client.write().await; let response = client .chunk_size(request) .await .map_err(|e| NodeError::RpcFailed(e.to_string()))?; let inner = response.into_inner(); if inner.exists { Ok(Some(inner.size)) } else { Ok(None) } } async fn ping(&self) -> NodeResult { if !self.is_healthy().await { return Err(NodeError::Unhealthy(self.node_id.clone())); } let start = std::time::Instant::now(); let request = PingRequest {}; let mut client = self.client.write().await; let _ = client .ping(request) .await .map_err(|e| NodeError::RpcFailed(e.to_string()))?; Ok(start.elapsed()) } } /// A pool of node clients for connection reuse pub struct NodeClientPool { clients: RwLock>>, } impl NodeClientPool { /// Create a new empty client pool pub fn new() -> Self { Self { clients: RwLock::new(Vec::new()), } } /// Add a client to the pool pub async fn add(&self, client: Arc) { self.clients.write().await.push(client); } /// Get all clients in the pool pub async fn all(&self) -> Vec> { self.clients.read().await.clone() } /// Get all healthy clients pub async fn healthy(&self) -> Vec> { let clients = self.clients.read().await; let mut healthy = Vec::new(); for client in clients.iter() { if client.is_healthy().await { healthy.push(client.clone()); } } healthy } /// Get a client by node ID pub async fn get(&self, node_id: &str) -> Option> { self.clients .read() .await .iter() .find(|c| c.node_id() == node_id) .cloned() } /// Remove a client from the pool pub async fn remove(&self, node_id: &str) { self.clients .write() .await .retain(|c| c.node_id() != node_id); } /// Get the number of clients in the pool pub async fn len(&self) -> usize { self.clients.read().await.len() } /// Check if the pool is empty pub async fn is_empty(&self) -> bool { self.clients.read().await.is_empty() } } impl Default for NodeClientPool { fn default() -> Self { Self::new() } } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_node_client_creation() { let client = NodeClient::connect("http://localhost:9002").await.unwrap(); assert!(client.is_healthy().await); assert!(!client.node_id().is_empty()); } #[tokio::test] async fn test_node_client_health_toggle() { let client = NodeClient::connect("http://localhost:9002").await.unwrap(); assert!(client.is_healthy().await); client.mark_unhealthy(); assert!(!client.is_healthy().await); client.mark_healthy(); assert!(client.is_healthy().await); } #[tokio::test] async fn test_node_client_pool() { let pool = NodeClientPool::new(); assert!(pool.is_empty().await); let client1 = Arc::new(NodeClient::connect("http://node1:9002").await.unwrap()); let client2 = Arc::new(NodeClient::connect("http://node2:9002").await.unwrap()); pool.add(client1.clone()).await; pool.add(client2.clone()).await; assert_eq!(pool.len().await, 2); assert!(pool.get(client1.node_id()).await.is_some()); pool.remove(client1.node_id()).await; assert_eq!(pool.len().await, 1); assert!(pool.get(client1.node_id()).await.is_none()); } }