//! ObjectService gRPC implementation use crate::metadata::MetadataStore; use bytes::Bytes; use lightningstor_api::proto::{ AbortMultipartUploadRequest, CompleteMultipartUploadRequest, CompleteMultipartUploadResponse, CopyObjectRequest, CopyObjectResponse, CreateMultipartUploadRequest, CreateMultipartUploadResponse, DeleteObjectRequest, DeleteObjectResponse, GetObjectRequest, GetObjectResponse, HeadObjectRequest, HeadObjectResponse, ListMultipartUploadsRequest, ListMultipartUploadsResponse, ListObjectVersionsRequest, ListObjectVersionsResponse, ListObjectsRequest, ListObjectsResponse, ListPartsRequest, ListPartsResponse, ObjectInfo, ObjectMetadata as ProtoObjectMetadata, PutObjectRequest, PutObjectResponse, UploadPartRequest, UploadPartResponse, }; use lightningstor_api::ObjectService; use lightningstor_storage::StorageBackend; use lightningstor_types::{ BucketId, ETag, Object, ObjectKey, ObjectMetadata, ObjectVersion, Result as LightningStorResult, }; use prost_types; use std::str::FromStr; use md5::{Digest, Md5}; use std::sync::Arc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status, Streaming}; /// ObjectService implementation pub struct ObjectServiceImpl { /// Storage backend for object data storage: Arc, /// Metadata store for object metadata metadata: Arc, } impl ObjectServiceImpl { /// Create a new ObjectService pub async fn new( storage: Arc, metadata: Arc, ) -> LightningStorResult { Ok(Self { storage, metadata }) } /// Convert LightningStor Error to gRPC Status fn to_status(err: lightningstor_types::Error) -> Status { Status::internal(err.to_string()) } /// Convert Object to ObjectInfo proto fn object_to_proto(&self, obj: &Object) -> ObjectInfo { ObjectInfo { key: obj.key.as_str().to_string(), etag: obj.etag.as_str().to_string(), size: obj.size, last_modified: Some(prost_types::Timestamp { seconds: obj.last_modified.timestamp(), nanos: obj.last_modified.timestamp_subsec_nanos() as i32, }), storage_class: obj.storage_class.clone(), version_id: obj.version.as_str().to_string(), is_latest: obj.is_latest, metadata: Some(ProtoObjectMetadata { content_type: obj.metadata.content_type.clone().unwrap_or_default(), content_encoding: obj.metadata.content_encoding.clone().unwrap_or_default(), content_disposition: obj.metadata.content_disposition.clone().unwrap_or_default(), content_language: obj.metadata.content_language.clone().unwrap_or_default(), cache_control: obj.metadata.cache_control.clone().unwrap_or_default(), user_metadata: obj.metadata.user_metadata.clone(), }), } } /// Calculate MD5 hash of data fn calculate_md5(data: &[u8]) -> ETag { let mut hasher = Md5::new(); hasher.update(data); let hash = hasher.finalize(); let hash_array: [u8; 16] = hash.into(); ETag::from_md5(&hash_array) } } #[tonic::async_trait] impl ObjectService for ObjectServiceImpl { type GetObjectStream = std::pin::Pin> + Send>>; async fn put_object( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); tracing::info!( bucket = %req.bucket, key = %req.key, size = req.body.len(), "PutObject request" ); // Load bucket // TODO: Extract org_id and project_id from request metadata/context // For now, assume they're in the bucket name or use default let org_id = "default"; // TODO: Get from request context let project_id = "default"; // TODO: Get from request context let bucket = self.metadata .load_bucket(org_id, project_id, &req.bucket) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found(format!("Bucket {} not found", req.bucket)))?; let bucket_id = BucketId::from_str(&bucket.id.to_string()) .map_err(|_| Status::internal("Invalid bucket ID"))?; // Validate object key let object_key = ObjectKey::new(&req.key) .map_err(|e| Status::invalid_argument(format!("Invalid object key: {}", e)))?; // Calculate ETag let etag = Self::calculate_md5(&req.body); // Create object metadata let metadata = if let Some(proto_meta) = req.metadata { ObjectMetadata { content_type: if proto_meta.content_type.is_empty() { None } else { Some(proto_meta.content_type) }, content_encoding: if proto_meta.content_encoding.is_empty() { None } else { Some(proto_meta.content_encoding) }, content_disposition: if proto_meta.content_disposition.is_empty() { None } else { Some(proto_meta.content_disposition) }, content_language: if proto_meta.content_language.is_empty() { None } else { Some(proto_meta.content_language) }, cache_control: if proto_meta.cache_control.is_empty() { None } else { Some(proto_meta.cache_control) }, user_metadata: proto_meta.user_metadata, } } else { ObjectMetadata::default() }; // Create object let mut object = Object::new( bucket.id.to_string(), object_key.clone(), etag.clone(), req.body.len() as u64, metadata.content_type.clone(), ); object.metadata = metadata; // Handle versioning if bucket.versioning == lightningstor_types::Versioning::Enabled { object.version = ObjectVersion::new(); } // Save object data to storage backend self.storage .put_object(&object.id, Bytes::from(req.body)) .await .map_err(|e| Status::internal(format!("Failed to store object: {}", e)))?; // Save object metadata self.metadata .save_object(&object) .await .map_err(Self::to_status)?; tracing::info!( bucket = %req.bucket, key = %req.key, object_id = %object.id, etag = %etag.as_str(), "Object stored successfully" ); Ok(Response::new(PutObjectResponse { etag: etag.as_str().to_string(), version_id: object.version.as_str().to_string(), })) } async fn get_object( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); tracing::info!( bucket = %req.bucket, key = %req.key, "GetObject request" ); // Load bucket let org_id = "default"; // TODO: Get from request context let project_id = "default"; // TODO: Get from request context let bucket = self.metadata .load_bucket(org_id, project_id, &req.bucket) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found(format!("Bucket {} not found", req.bucket)))?; let bucket_id = BucketId::from_str(&bucket.id.to_string()) .map_err(|_| Status::internal("Invalid bucket ID"))?; // Load object metadata let version_id = if req.version_id.is_empty() { None } else { Some(req.version_id.as_str()) }; let object = self.metadata .load_object(&bucket_id, &req.key, version_id) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found(format!("Object {} not found", req.key)))?; // Check if delete marker if object.is_delete_marker { return Err(Status::not_found("Object is a delete marker")); } // Get object data from storage backend let data = self.storage .get_object(&object.id) .await .map_err(|e| Status::internal(format!("Failed to retrieve object: {}", e)))?; // Handle range request let (start, end) = if req.range_start >= 0 && req.range_end >= 0 { let start = req.range_start as usize; let end = if req.range_end >= req.range_start { (req.range_end as usize).min(data.len()) } else { data.len() }; (start.min(data.len()), end) } else { (0, data.len()) }; let chunk_size = 1024 * 1024; // 1MB chunks let (tx, rx) = tokio::sync::mpsc::channel(16); // Send metadata first let object_info = self.object_to_proto(&object); let _ = tx.send(Ok(GetObjectResponse { content: Some(lightningstor_api::proto::get_object_response::Content::Metadata(object_info)), })).await; // Clone data slice for async move block let data_slice = data[start..end].to_vec(); tokio::spawn(async move { for chunk in data_slice.chunks(chunk_size) { if tx.send(Ok(GetObjectResponse { content: Some(lightningstor_api::proto::get_object_response::Content::BodyChunk(chunk.to_vec())), })).await.is_err() { break; } } }); Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) } async fn delete_object( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); tracing::info!( bucket = %req.bucket, key = %req.key, "DeleteObject request" ); let org_id = "default"; let project_id = "default"; // Load bucket let bucket = self.metadata .load_bucket(org_id, project_id, &req.bucket) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found(format!("Bucket {} not found", req.bucket)))?; let bucket_id = BucketId::from_str(&bucket.id.to_string()) .map_err(|_| Status::internal("Invalid bucket ID"))?; // Parse version ID let version_id = if req.version_id.is_empty() { None } else { Some(req.version_id.as_str()) }; // Load object to get its storage ID let object = self.metadata .load_object(&bucket_id, &req.key, version_id) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found(format!("Object {} not found", req.key)))?; // Delete from storage backend self.storage .delete_object(&object.id) .await .map_err(|e| Status::internal(format!("Failed to delete object data: {}", e)))?; // Delete from metadata store self.metadata .delete_object(&bucket_id, &req.key, version_id) .await .map_err(Self::to_status)?; tracing::info!( bucket = %req.bucket, key = %req.key, "Object deleted successfully" ); Ok(Response::new(DeleteObjectResponse { version_id: object.version.as_str().to_string(), delete_marker: object.is_delete_marker, })) } async fn head_object( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); tracing::info!( bucket = %req.bucket, key = %req.key, "HeadObject request" ); let org_id = "default"; let project_id = "default"; // Load bucket let bucket = self.metadata .load_bucket(org_id, project_id, &req.bucket) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found(format!("Bucket {} not found", req.bucket)))?; let bucket_id = BucketId::from_str(&bucket.id.to_string()) .map_err(|_| Status::internal("Invalid bucket ID"))?; // Parse version ID let version_id = if req.version_id.is_empty() { None } else { Some(req.version_id.as_str()) }; // Load object metadata let object = self.metadata .load_object(&bucket_id, &req.key, version_id) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found(format!("Object {} not found", req.key)))?; // Check if delete marker if object.is_delete_marker { return Err(Status::not_found("Object is a delete marker")); } Ok(Response::new(HeadObjectResponse { object: Some(self.object_to_proto(&object)), })) } async fn copy_object( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented("CopyObject not yet implemented")) } async fn list_objects( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); tracing::info!( bucket = %req.bucket, prefix = %req.prefix, max_keys = req.max_keys, "ListObjects request" ); let org_id = "default"; let project_id = "default"; // Load bucket let bucket = self.metadata .load_bucket(org_id, project_id, &req.bucket) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found(format!("Bucket {} not found", req.bucket)))?; let bucket_id = BucketId::from_str(&bucket.id.to_string()) .map_err(|_| Status::internal("Invalid bucket ID"))?; // Default max_keys to 1000 if not specified let max_keys = if req.max_keys > 0 { req.max_keys } else { 1000 }; // List objects from metadata store let objects = self.metadata .list_objects(&bucket_id, &req.prefix, max_keys) .await .map_err(Self::to_status)?; // Convert to proto objects let object_infos: Vec = objects .iter() .filter(|obj| !obj.is_delete_marker) .map(|obj| self.object_to_proto(obj)) .collect(); let is_truncated = object_infos.len() >= max_keys as usize; let next_continuation_token = if is_truncated { object_infos.last().map(|obj| obj.key.clone()) } else { None }; Ok(Response::new(ListObjectsResponse { objects: object_infos, common_prefixes: vec![], // TODO: Implement prefix grouping is_truncated, next_continuation_token: next_continuation_token.unwrap_or_default(), key_count: objects.len() as u32, })) } async fn list_object_versions( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented( "ListObjectVersions not yet implemented", )) } async fn create_multipart_upload( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented( "CreateMultipartUpload not yet implemented", )) } async fn upload_part( &self, _request: Request>, ) -> Result, Status> { Err(Status::unimplemented("UploadPart not yet implemented")) } async fn complete_multipart_upload( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented( "CompleteMultipartUpload not yet implemented", )) } async fn abort_multipart_upload( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented( "AbortMultipartUpload not yet implemented", )) } async fn list_parts( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented("ListParts not yet implemented")) } async fn list_multipart_uploads( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented( "ListMultipartUploads not yet implemented", )) } }