//! gRPC service implementation for storage node use crate::proto::{ node_service_server::NodeService, BatchGetChunksRequest, BatchPutChunksResponse, ChunkExistsRequest, ChunkExistsResponse, ChunkSizeRequest, ChunkSizeResponse, DeleteChunkRequest, GetChunkRequest, GetChunkResponse, GetStatusRequest, GetStatusResponse, PingRequest, PingResponse, PutChunkRequest, PutChunkResponse, }; use crate::storage::LocalChunkStore; use crate::NodeConfig; use std::sync::Arc; use std::time::Instant; use tokio::task::JoinSet; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status, Streaming}; use tracing::{debug, error}; const BATCH_IO_PARALLELISM: usize = 32; /// Implementation of the NodeService gRPC service pub struct NodeServiceImpl { /// Local chunk storage store: Arc, /// Node configuration config: Arc, /// Server start time start_time: Instant, } impl NodeServiceImpl { /// Create a new node service pub fn new(store: Arc, config: Arc) -> Self { Self { store, config, start_time: Instant::now(), } } fn chunk_read_status(chunk_id: &str, error: crate::storage::StorageError) -> Status { match error { crate::storage::StorageError::NotFound(_) => Status::not_found(format!( "Chunk not found: {}", chunk_id )), other => Status::internal(other.to_string()), } } } #[tonic::async_trait] impl NodeService for NodeServiceImpl { async fn put_chunk( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); debug!( chunk_id = %req.chunk_id, shard_index = req.shard_index, is_parity = req.is_parity, size = req.data.len(), "PutChunk request" ); let size = self .store .put(&req.chunk_id, &req.data) .await .map_err(|e| { error!(error = ?e, "Failed to put chunk"); Status::internal(e.to_string()) })?; metrics::counter!("node_chunks_stored").increment(1); metrics::counter!("node_bytes_stored").increment(size); Ok(Response::new(PutChunkResponse { size })) } async fn get_chunk( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); debug!( chunk_id = %req.chunk_id, shard_index = req.shard_index, is_parity = req.is_parity, "GetChunk request" ); let data = self.store.get(&req.chunk_id).await.map_err(|e| { match &e { crate::storage::StorageError::NotFound(_) => { debug!(chunk_id = %req.chunk_id, "Chunk not found"); Status::not_found(e.to_string()) } _ => { error!(error = ?e, "Failed to get chunk"); Status::internal(e.to_string()) } } })?; metrics::counter!("node_chunks_retrieved").increment(1); metrics::counter!("node_bytes_retrieved").increment(data.len() as u64); Ok(Response::new(GetChunkResponse { data: data.into(), size: 0, // Size is implicit from data.len() })) } async fn delete_chunk( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); debug!(chunk_id = %req.chunk_id, "DeleteChunk request"); self.store.delete(&req.chunk_id).await.map_err(|e| { error!(error = ?e, "Failed to delete chunk"); Status::internal(e.to_string()) })?; metrics::counter!("node_chunks_deleted").increment(1); Ok(Response::new(())) } async fn chunk_exists( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let exists = self.store.exists(&req.chunk_id); Ok(Response::new(ChunkExistsResponse { exists })) } async fn chunk_size( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); match self.store.size(&req.chunk_id) { Some(size) => Ok(Response::new(ChunkSizeResponse { size, exists: true })), None => Ok(Response::new(ChunkSizeResponse { size: 0, exists: false, })), } } async fn ping(&self, _request: Request) -> Result, Status> { let start = Instant::now(); // Minimal processing - just measure latency let latency_us = start.elapsed().as_micros() as u64; Ok(Response::new(PingResponse { latency_us })) } async fn get_status( &self, _request: Request, ) -> Result, Status> { let uptime_seconds = self.start_time.elapsed().as_secs(); Ok(Response::new(GetStatusResponse { node_id: self.config.node_id.clone(), endpoint: self.config.grpc_addr.to_string(), zone: self.config.zone.clone(), region: self.config.region.clone(), capacity_bytes: self.store.max_capacity(), used_bytes: self.store.total_bytes(), chunk_count: self.store.chunk_count(), healthy: true, uptime_seconds, })) } async fn batch_put_chunks( &self, request: Request>, ) -> Result, Status> { let mut stream = request.into_inner(); let mut success_count = 0u32; let mut failure_count = 0u32; let mut errors = Vec::new(); let mut in_flight = JoinSet::new(); while let Some(req) = stream.message().await? { while in_flight.len() >= BATCH_IO_PARALLELISM { record_batch_put_result( in_flight.join_next().await, &mut success_count, &mut failure_count, &mut errors, ); } let store = self.store.clone(); in_flight.spawn(async move { let chunk_id = req.chunk_id; let result = store.put(&chunk_id, &req.data).await; (chunk_id, result) }); } while !in_flight.is_empty() { record_batch_put_result( in_flight.join_next().await, &mut success_count, &mut failure_count, &mut errors, ); } Ok(Response::new(BatchPutChunksResponse { success_count, failure_count, errors, })) } type BatchGetChunksStream = ReceiverStream>; async fn batch_get_chunks( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let (tx, rx) = tokio::sync::mpsc::channel(32); let store = self.store.clone(); tokio::spawn(async move { let chunks = req.chunks; let chunk_count = chunks.len(); let mut results: Vec>> = (0..chunk_count).map(|_| None).collect(); let mut readers = JoinSet::new(); let mut next_index = 0usize; let spawn_reader = |readers: &mut JoinSet<(usize, Result)>, index: usize, chunk_req: crate::proto::GetChunkRequest| { let store = store.clone(); readers.spawn(async move { let chunk_id = chunk_req.chunk_id; let result = store .get(&chunk_id) .await .map(|data| GetChunkResponse { size: data.len() as u64, data: data.into(), }) .map_err(|error| Self::chunk_read_status(&chunk_id, error)); (index, result) }); }; while next_index < chunk_count && readers.len() < BATCH_IO_PARALLELISM { spawn_reader(&mut readers, next_index, chunks[next_index].clone()); next_index += 1; } while let Some(reader) = readers.join_next().await { match reader { Ok((index, result)) => { results[index] = Some(result); } Err(join_error) => { let status = Status::internal(format!( "batch get task failed: {}", join_error )); let index = results.iter().position(|entry| entry.is_none()).unwrap_or(0); results[index] = Some(Err(status)); } } if next_index < chunk_count { spawn_reader(&mut readers, next_index, chunks[next_index].clone()); next_index += 1; } } for result in results { let Some(result) = result else { let _ = tx.send(Err(Status::internal("batch get result missing"))).await; break; }; if tx.send(result).await.is_err() { break; } } }); Ok(Response::new(ReceiverStream::new(rx))) } } fn record_batch_put_result( joined: Option), tokio::task::JoinError>>, success_count: &mut u32, failure_count: &mut u32, errors: &mut Vec, ) { let Some(result) = joined else { return; }; match result { Ok((_chunk_id, Ok(size))) => { *success_count += 1; metrics::counter!("node_chunks_stored").increment(1); metrics::counter!("node_bytes_stored").increment(size); } Ok((chunk_id, Err(error))) => { *failure_count += 1; errors.push(format!("{}: {}", chunk_id, error)); } Err(join_error) => { *failure_count += 1; errors.push(format!("join error: {}", join_error)); } } }