//! ObjectService gRPC implementation use crate::metadata::MetadataStore; use bytes::{Bytes, BytesMut}; use dashmap::DashMap; use futures::stream; use iam_service_auth::{get_tenant_context, resource_for_tenant, AuthService, TenantContext}; use lightningstor_api::proto::{ AbortMultipartUploadRequest, CompleteMultipartUploadRequest, CompleteMultipartUploadResponse, CompletedPart, CopyObjectRequest, CopyObjectResponse, CreateMultipartUploadRequest, CreateMultipartUploadResponse, DeleteObjectRequest, DeleteObjectResponse, GetObjectRequest, GetObjectResponse, HeadObjectRequest, HeadObjectResponse, ListMultipartUploadsRequest, ListMultipartUploadsResponse, ListObjectVersionsRequest, ListObjectVersionsResponse, ListObjectsRequest, ListObjectsResponse, ListPartsRequest, ListPartsResponse, MultipartUploadInfo, ObjectInfo, ObjectMetadata as ProtoObjectMetadata, PartInfo, PutObjectRequest, PutObjectResponse, UploadPartRequest, UploadPartResponse, }; use lightningstor_api::ObjectService; use lightningstor_storage::StorageBackend; use lightningstor_types::{ Bucket, BucketId, ETag, MultipartUpload, Object, ObjectKey, ObjectMetadata, ObjectVersion, Part, PartNumber, Result as LightningStorResult, }; use md5::{Digest, Md5}; use std::str::FromStr; use std::sync::Arc; use tokio::sync::Mutex; use tonic::{Request, Response, Status, Streaming}; const OBJECT_STREAM_CHUNK_SIZE: usize = 8 * 1024 * 1024; /// ObjectService implementation pub struct ObjectServiceImpl { /// Storage backend for object data storage: Arc, /// Metadata store for object metadata metadata: Arc, auth: Arc, multipart_locks: Arc>>>, } enum Entry<'a> { Object(&'a Object), Prefix(&'a str), } impl ObjectServiceImpl { /// Create a new ObjectService pub async fn new( storage: Arc, metadata: Arc, auth: Arc, ) -> LightningStorResult { Ok(Self { storage, metadata, auth, multipart_locks: Arc::new(DashMap::new()), }) } /// Convert LightningStor Error to gRPC Status fn to_status(err: lightningstor_types::Error) -> Status { Status::internal(err.to_string()) } /// Convert Object to ObjectInfo proto fn object_to_proto(&self, obj: &Object) -> ObjectInfo { ObjectInfo { key: obj.key.as_str().to_string(), etag: obj.etag.as_str().to_string(), size: obj.size, last_modified: Some(prost_types::Timestamp { seconds: obj.last_modified.timestamp(), nanos: obj.last_modified.timestamp_subsec_nanos() as i32, }), storage_class: obj.storage_class.clone(), version_id: obj.version.as_str().to_string(), is_latest: obj.is_latest, metadata: Some(ProtoObjectMetadata { content_type: obj.metadata.content_type.clone().unwrap_or_default(), content_encoding: obj.metadata.content_encoding.clone().unwrap_or_default(), content_disposition: obj.metadata.content_disposition.clone().unwrap_or_default(), content_language: obj.metadata.content_language.clone().unwrap_or_default(), cache_control: obj.metadata.cache_control.clone().unwrap_or_default(), user_metadata: obj.metadata.user_metadata.clone(), }), } } /// Calculate MD5 hash of data fn calculate_md5(data: &[u8]) -> ETag { let mut hasher = Md5::new(); hasher.update(data); let hash = hasher.finalize(); let hash_array: [u8; 16] = hash.into(); ETag::from_md5(&hash_array) } fn proto_metadata_to_object_metadata(metadata: Option) -> ObjectMetadata { if let Some(proto_meta) = metadata { ObjectMetadata { content_type: if proto_meta.content_type.is_empty() { None } else { Some(proto_meta.content_type) }, content_encoding: if proto_meta.content_encoding.is_empty() { None } else { Some(proto_meta.content_encoding) }, content_disposition: if proto_meta.content_disposition.is_empty() { None } else { Some(proto_meta.content_disposition) }, content_language: if proto_meta.content_language.is_empty() { None } else { Some(proto_meta.content_language) }, cache_control: if proto_meta.cache_control.is_empty() { None } else { Some(proto_meta.cache_control) }, user_metadata: proto_meta.user_metadata, } } else { ObjectMetadata::default() } } fn resolve_range(total_len: usize, start: i64, end: i64) -> (usize, usize) { if start == 0 && end == 0 { return (0, total_len); } if start >= 0 && end >= 0 { let range_start = (start as usize).min(total_len); let range_end = if end >= start { (end as usize).min(total_len) } else { total_len }; return (range_start, range_end); } (0, total_len) } async fn delete_multipart_parts(&self, upload: &MultipartUpload) -> Result<(), Status> { for part in &upload.parts { self.storage .delete_part(upload.upload_id.as_str(), part.part_number.as_u32()) .await .map_err(|e| Status::internal(format!("Failed to delete multipart part: {}", e)))?; } self.storage .delete_upload_parts(upload.upload_id.as_str()) .await .map_err(|e| Status::internal(format!("Failed to clean multipart upload: {}", e)))?; Ok(()) } fn multipart_object_stream( &self, object: &Object, upload: MultipartUpload, start: usize, end: usize, ) -> ::GetObjectStream { let storage = self.storage.clone(); let state = ( storage, upload, Some(self.object_to_proto(object)), 0usize, 0u64, ); let range_start = start as u64; let range_end = end as u64; let object_size = object.size; Box::pin(stream::try_unfold( state, move |(storage, upload, object_info, next_part_index, consumed)| async move { if let Some(info) = object_info { return Ok(Some(( GetObjectResponse { content: Some( lightningstor_api::proto::get_object_response::Content::Metadata( info, ), ), }, (storage, upload, None, next_part_index, consumed), ))); } if range_start >= range_end || range_start >= object_size { return Ok(None); } let mut idx = next_part_index; let mut offset = consumed; while idx < upload.parts.len() { let part = &upload.parts[idx]; let part_start = offset; let part_end = part_start + part.size; idx += 1; offset = part_end; if range_end <= part_start || range_start >= part_end { continue; } let bytes = storage .get_part(upload.upload_id.as_str(), part.part_number.as_u32()) .await .map_err(|e| { Status::internal(format!( "Failed to retrieve multipart object part: {}", e )) })?; let body_start = range_start.saturating_sub(part_start) as usize; let body_end = (range_end.min(part_end) - part_start) as usize; if body_start > bytes.len() || body_end > bytes.len() || body_start > body_end { return Err(Status::internal(format!( "Multipart part {} for upload {} is inconsistent: stored={} requested={}..{}", part.part_number.as_u32(), upload.upload_id.as_str(), bytes.len(), body_start, body_end ))); } return Ok(Some(( GetObjectResponse { content: Some( lightningstor_api::proto::get_object_response::Content::BodyChunk( bytes.slice(body_start..body_end), ), ), }, (storage, upload, None, idx, offset), ))); } Ok(None) }, )) } fn object_stream_from_bytes( &self, object: &Object, data: Bytes, start: usize, end: usize, ) -> ::GetObjectStream { let range_start = start.min(data.len()); let range_end = end.min(data.len()); let state = ( data, Some(self.object_to_proto(object)), range_start, range_end, OBJECT_STREAM_CHUNK_SIZE, ); Box::pin(stream::try_unfold( state, move |(data, object_info, next_offset, range_end, chunk_size)| async move { if let Some(info) = object_info { return Ok(Some(( GetObjectResponse { content: Some( lightningstor_api::proto::get_object_response::Content::Metadata( info, ), ), }, (data, None, next_offset, range_end, chunk_size), ))); } if next_offset >= range_end { return Ok(None); } let chunk_end = (next_offset + chunk_size).min(range_end); Ok(Some(( GetObjectResponse { content: Some( lightningstor_api::proto::get_object_response::Content::BodyChunk( data.slice(next_offset..chunk_end), ), ), }, (data, None, chunk_end, range_end, chunk_size), ))) }, )) } async fn load_bucket_for_tenant( &self, tenant: &TenantContext, bucket_name: &str, ) -> Result { self.metadata .load_bucket(&tenant.org_id, &tenant.project_id, bucket_name) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found(format!("Bucket {} not found", bucket_name))) } async fn authorize_object_action( &self, tenant: &TenantContext, action: &str, bucket: &Bucket, object_id: &str, ) -> Result<(), Status> { let resource_id = format!("{}/{}", bucket.id, object_id); self.auth .authorize( tenant, action, &resource_for_tenant("object", resource_id, &bucket.org_id, &bucket.project_id), ) .await } async fn load_full_object_bytes(&self, object: &Object) -> Result { if let Some(upload) = self .metadata .load_object_multipart_upload(&object.id) .await .map_err(Self::to_status)? { let mut body = BytesMut::new(); for part in &upload.parts { let bytes = self .storage .get_part(upload.upload_id.as_str(), part.part_number.as_u32()) .await .map_err(|e| { Status::internal(format!("Failed to retrieve multipart object part: {}", e)) })?; body.extend_from_slice(bytes.as_ref()); } if body.len() as u64 != object.size { return Err(Status::internal(format!( "Multipart object {} has inconsistent size: expected {}, got {}", object.id, object.size, body.len() ))); } Ok(body.freeze()) } else { self.storage .get_object(&object.id) .await .map_err(|e| Status::internal(format!("Failed to retrieve object: {}", e))) } } fn multipart_lock(&self, upload_id: &str) -> Arc> { self.multipart_locks .entry(upload_id.to_string()) .or_insert_with(|| Arc::new(Mutex::new(()))) .clone() } fn drop_multipart_lock_if_idle(&self, upload_id: &str) { if let Some(entry) = self.multipart_locks.get(upload_id) { if Arc::strong_count(entry.value()) == 2 { drop(entry); self.multipart_locks.remove(upload_id); } } } } const ACTION_OBJECTS_CREATE: &str = "storage:objects:create"; const ACTION_OBJECTS_READ: &str = "storage:objects:read"; const ACTION_OBJECTS_UPDATE: &str = "storage:objects:update"; const ACTION_OBJECTS_DELETE: &str = "storage:objects:delete"; const ACTION_OBJECTS_LIST: &str = "storage:objects:list"; #[tonic::async_trait] impl ObjectService for ObjectServiceImpl { type GetObjectStream = std::pin::Pin< Box> + Send>, >; async fn put_object( &self, request: Request, ) -> Result, Status> { let tenant = get_tenant_context(&request)?; let req = request.into_inner(); let body = req.body; let body_size = body.len() as u64; tracing::debug!( bucket = %req.bucket, key = %req.key, size = body_size, "PutObject request" ); let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?; self.authorize_object_action(&tenant, ACTION_OBJECTS_CREATE, &bucket, &req.key) .await?; // Validate object key let object_key = ObjectKey::new(&req.key) .map_err(|e| Status::invalid_argument(format!("Invalid object key: {}", e)))?; // Calculate ETag let etag = Self::calculate_md5(&body); // Create object metadata let metadata = Self::proto_metadata_to_object_metadata(req.metadata); // Create object let mut object = Object::new( bucket.id.to_string(), object_key.clone(), etag.clone(), body_size, metadata.content_type.clone(), ); object.metadata = metadata; // Handle versioning if bucket.versioning == lightningstor_types::Versioning::Enabled { object.version = ObjectVersion::new(); } // Save object data to storage backend self.storage .put_object(&object.id, body) .await .map_err(|e| Status::internal(format!("Failed to store object: {}", e)))?; // Save object metadata self.metadata .save_object(&object) .await .map_err(Self::to_status)?; tracing::debug!( bucket = %req.bucket, key = %req.key, object_id = %object.id, etag = %etag.as_str(), "Object stored successfully" ); Ok(Response::new(PutObjectResponse { etag: etag.as_str().to_string(), version_id: object.version.as_str().to_string(), })) } async fn get_object( &self, request: Request, ) -> Result, Status> { let tenant = get_tenant_context(&request)?; let req = request.into_inner(); tracing::debug!( bucket = %req.bucket, key = %req.key, "GetObject request" ); let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?; self.authorize_object_action(&tenant, ACTION_OBJECTS_READ, &bucket, &req.key) .await?; let bucket_id: BucketId = BucketId::from_str(&bucket.id.to_string()) .map_err(|_| Status::internal("Invalid bucket ID"))?; // Load object metadata let version_id = if req.version_id.is_empty() { None } else { Some(req.version_id.as_str()) }; let object = self .metadata .load_object(&bucket_id, &req.key, version_id) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found(format!("Object {} not found", req.key)))?; // Check if delete marker if object.is_delete_marker { return Err(Status::not_found("Object is a delete marker")); } let (start, end) = Self::resolve_range(object.size as usize, req.range_start, req.range_end); if let Some(upload) = self .metadata .load_object_multipart_upload(&object.id) .await .map_err(Self::to_status)? { return Ok(Response::new( self.multipart_object_stream(&object, upload, start, end), )); } let data = self .storage .get_object(&object.id) .await .map_err(|e| Status::internal(format!("Failed to retrieve object: {}", e)))?; Ok(Response::new( self.object_stream_from_bytes(&object, data, start, end), )) } async fn delete_object( &self, request: Request, ) -> Result, Status> { let tenant = get_tenant_context(&request)?; let req = request.into_inner(); tracing::debug!( bucket = %req.bucket, key = %req.key, "DeleteObject request" ); let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?; self.authorize_object_action(&tenant, ACTION_OBJECTS_DELETE, &bucket, &req.key) .await?; let bucket_id: BucketId = BucketId::from_str(&bucket.id.to_string()) .map_err(|_| Status::internal("Invalid bucket ID"))?; // Parse version ID let version_id = if req.version_id.is_empty() { None } else { Some(req.version_id.as_str()) }; // Load object to get its storage ID let object = self .metadata .load_object(&bucket_id, &req.key, version_id) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found(format!("Object {} not found", req.key)))?; if let Some(upload) = self .metadata .load_object_multipart_upload(&object.id) .await .map_err(Self::to_status)? { self.delete_multipart_parts(&upload).await?; self.metadata .delete_object_multipart_upload(&object.id) .await .map_err(Self::to_status)?; self.metadata .delete_multipart_upload(upload.upload_id.as_str()) .await .map_err(Self::to_status)?; } else { self.storage .delete_object(&object.id) .await .map_err(|e| Status::internal(format!("Failed to delete object data: {}", e)))?; } // Delete from metadata store self.metadata .delete_object(&bucket_id, &req.key, version_id) .await .map_err(Self::to_status)?; tracing::debug!( bucket = %req.bucket, key = %req.key, "Object deleted successfully" ); Ok(Response::new(DeleteObjectResponse { version_id: object.version.as_str().to_string(), delete_marker: object.is_delete_marker, })) } async fn head_object( &self, request: Request, ) -> Result, Status> { let tenant = get_tenant_context(&request)?; let req = request.into_inner(); tracing::debug!( bucket = %req.bucket, key = %req.key, "HeadObject request" ); let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?; self.authorize_object_action(&tenant, ACTION_OBJECTS_READ, &bucket, &req.key) .await?; let bucket_id: BucketId = BucketId::from_str(&bucket.id.to_string()) .map_err(|_| Status::internal("Invalid bucket ID"))?; // Parse version ID let version_id = if req.version_id.is_empty() { None } else { Some(req.version_id.as_str()) }; // Load object metadata let object = self .metadata .load_object(&bucket_id, &req.key, version_id) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found(format!("Object {} not found", req.key)))?; // Check if delete marker if object.is_delete_marker { return Err(Status::not_found("Object is a delete marker")); } Ok(Response::new(HeadObjectResponse { object: Some(self.object_to_proto(&object)), })) } async fn copy_object( &self, request: Request, ) -> Result, Status> { let tenant = get_tenant_context(&request)?; let req = request.into_inner(); if req.source_bucket.is_empty() { return Err(Status::invalid_argument("source_bucket is required")); } if req.source_key.is_empty() { return Err(Status::invalid_argument("source_key is required")); } if req.dest_bucket.is_empty() { return Err(Status::invalid_argument("dest_bucket is required")); } if req.dest_key.is_empty() { return Err(Status::invalid_argument("dest_key is required")); } let source_bucket = self .load_bucket_for_tenant(&tenant, &req.source_bucket) .await?; self.authorize_object_action( &tenant, ACTION_OBJECTS_READ, &source_bucket, &req.source_key, ) .await?; let dest_bucket = self .load_bucket_for_tenant(&tenant, &req.dest_bucket) .await?; self.authorize_object_action(&tenant, ACTION_OBJECTS_CREATE, &dest_bucket, &req.dest_key) .await?; let source_bucket_id: BucketId = BucketId::from_str(&source_bucket.id.to_string()) .map_err(|_| Status::internal("Invalid source bucket ID"))?; let dest_key = ObjectKey::new(&req.dest_key) .map_err(|e| Status::invalid_argument(format!("Invalid destination key: {}", e)))?; let source_version_id = if req.source_version_id.is_empty() { None } else { Some(req.source_version_id.as_str()) }; let source_object = self .metadata .load_object(&source_bucket_id, &req.source_key, source_version_id) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found(format!("Object {} not found", req.source_key)))?; if source_object.is_delete_marker { return Err(Status::not_found("Source object is a delete marker")); } let data = self.load_full_object_bytes(&source_object).await?; let object_metadata = if req.metadata_directive_replace { Self::proto_metadata_to_object_metadata(req.metadata) } else { source_object.metadata.clone() }; let mut dest_object = Object::new( dest_bucket.id.to_string(), dest_key, source_object.etag.clone(), source_object.size, object_metadata.content_type.clone(), ); dest_object.metadata = object_metadata; dest_object.storage_class = source_object.storage_class.clone(); if dest_bucket.versioning == lightningstor_types::Versioning::Enabled { dest_object.version = ObjectVersion::new(); } self.storage .put_object(&dest_object.id, data) .await .map_err(|e| Status::internal(format!("Failed to store copied object: {}", e)))?; self.metadata .save_object(&dest_object) .await .map_err(Self::to_status)?; Ok(Response::new(CopyObjectResponse { etag: dest_object.etag.as_str().to_string(), version_id: dest_object.version.as_str().to_string(), last_modified: Some(prost_types::Timestamp { seconds: dest_object.last_modified.timestamp(), nanos: dest_object.last_modified.timestamp_subsec_nanos() as i32, }), })) } async fn list_objects( &self, request: Request, ) -> Result, Status> { let tenant = get_tenant_context(&request)?; let req = request.into_inner(); tracing::info!( bucket = %req.bucket, prefix = %req.prefix, max_keys = req.max_keys, "ListObjects request" ); let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?; self.authorize_object_action(&tenant, ACTION_OBJECTS_LIST, &bucket, &req.prefix) .await?; let bucket_id: BucketId = BucketId::from_str(&bucket.id.to_string()) .map_err(|_| Status::internal("Invalid bucket ID"))?; // Default max_keys to 1000 if not specified let max_keys = if req.max_keys > 0 { req.max_keys } else { 1000 }; let start_after = if !req.continuation_token.is_empty() { req.continuation_token.as_str() } else if !req.start_after.is_empty() { req.start_after.as_str() } else { "" }; // List objects from metadata store (fetch all to apply delimiter/pagination locally) let mut objects = self .metadata .list_objects(&bucket_id, &req.prefix, 0) .await .map_err(Self::to_status)?; // Filter delete markers and apply start_after objects.retain(|obj| !obj.is_delete_marker); if !start_after.is_empty() { objects.retain(|obj| obj.key.as_str() > start_after); } // Ensure stable ordering objects.sort_by(|a, b| a.key.as_str().cmp(b.key.as_str())); let delimiter = req.delimiter.as_str(); let has_delimiter = !delimiter.is_empty(); // Build entries (objects + common prefixes) in lexicographic order let mut common_prefixes = std::collections::BTreeSet::new(); let mut object_entries = Vec::new(); if has_delimiter { for obj in &objects { let key = obj.key.as_str(); let relative = key.strip_prefix(req.prefix.as_str()).unwrap_or(key); if let Some(pos) = relative.find(delimiter) { let common_prefix = format!("{}{}{}", req.prefix, &relative[..pos], delimiter); common_prefixes.insert(common_prefix); } else { object_entries.push(obj); } } } else { object_entries.extend(objects.iter()); } let mut entries: Vec<(String, Entry<'_>)> = Vec::new(); for obj in object_entries { entries.push((obj.key.as_str().to_string(), Entry::Object(obj))); } for prefix in &common_prefixes { entries.push((prefix.clone(), Entry::Prefix(prefix.as_str()))); } entries.sort_by(|a, b| a.0.cmp(&b.0)); let is_truncated = entries.len() > max_keys as usize; let limited_entries = entries.into_iter().take(max_keys as usize); let mut object_infos = Vec::new(); let mut prefixes = Vec::new(); let mut last_key = None; for (key, entry) in limited_entries { last_key = Some(key); match entry { Entry::Object(obj) => object_infos.push(self.object_to_proto(obj)), Entry::Prefix(prefix) => prefixes.push(prefix.to_string()), } } let next_continuation_token = if is_truncated { last_key.unwrap_or_default() } else { String::new() }; Ok(Response::new(ListObjectsResponse { key_count: (object_infos.len() + prefixes.len()) as u32, objects: object_infos, common_prefixes: prefixes, is_truncated, next_continuation_token, })) } async fn list_object_versions( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented( "ListObjectVersions not yet implemented", )) } async fn create_multipart_upload( &self, request: Request, ) -> Result, Status> { let tenant = get_tenant_context(&request)?; let req = request.into_inner(); let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?; self.authorize_object_action(&tenant, ACTION_OBJECTS_CREATE, &bucket, &req.key) .await?; let object_key = ObjectKey::new(&req.key) .map_err(|e| Status::invalid_argument(format!("Invalid object key: {}", e)))?; let mut upload = MultipartUpload::new(bucket.id.to_string(), object_key); upload.metadata = Self::proto_metadata_to_object_metadata(req.metadata); self.metadata .save_multipart_upload(&upload) .await .map_err(Self::to_status)?; Ok(Response::new(CreateMultipartUploadResponse { bucket: req.bucket, key: req.key, upload_id: upload.upload_id.as_str().to_string(), })) } async fn upload_part( &self, request: Request>, ) -> Result, Status> { let tenant = get_tenant_context(&request)?; let mut stream = request.into_inner(); let first = stream .message() .await? .ok_or_else(|| Status::invalid_argument("UploadPart stream is empty"))?; let bucket = self.load_bucket_for_tenant(&tenant, &first.bucket).await?; self.authorize_object_action(&tenant, ACTION_OBJECTS_UPDATE, &bucket, &first.key) .await?; let upload_lock = self.multipart_lock(&first.upload_id); { let _guard = upload_lock.lock().await; let upload = self .metadata .load_multipart_upload(&first.upload_id) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found("multipart upload not found"))?; if upload.bucket_id != bucket.id.to_string() || upload.key.as_str() != first.key { return Err(Status::failed_precondition( "multipart upload does not match bucket/key", )); } } let part_number = PartNumber::new(first.part_number) .map_err(|e| Status::invalid_argument(e.to_string()))?; let mut body = BytesMut::from(first.body.as_ref()); let declared_md5 = first.content_md5; while let Some(chunk) = stream.message().await? { if !chunk.bucket.is_empty() && chunk.bucket != first.bucket { return Err(Status::invalid_argument( "bucket changed within UploadPart stream", )); } if !chunk.key.is_empty() && chunk.key != first.key { return Err(Status::invalid_argument( "key changed within UploadPart stream", )); } if !chunk.upload_id.is_empty() && chunk.upload_id != first.upload_id { return Err(Status::invalid_argument( "upload_id changed within UploadPart stream", )); } if chunk.part_number != 0 && chunk.part_number != first.part_number { return Err(Status::invalid_argument( "part_number changed within UploadPart stream", )); } body.extend_from_slice(chunk.body.as_ref()); } let etag = Self::calculate_md5(&body); if !declared_md5.is_empty() && declared_md5 != etag.as_str() { return Err(Status::invalid_argument("content_md5 mismatch")); } let body = body.freeze(); let body_size = body.len() as u64; self.storage .put_part(first.upload_id.as_str(), part_number.as_u32(), body) .await .map_err(|e| Status::internal(format!("Failed to store multipart part: {}", e)))?; let _guard = upload_lock.lock().await; let mut upload = self .metadata .load_multipart_upload(&first.upload_id) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found("multipart upload not found"))?; if upload.bucket_id != bucket.id.to_string() || upload.key.as_str() != first.key { let _ = self .storage .delete_part(first.upload_id.as_str(), part_number.as_u32()) .await; return Err(Status::failed_precondition( "multipart upload does not match bucket/key", )); } upload.parts.retain(|part| part.part_number != part_number); upload.parts.push(Part { part_number, etag: etag.clone(), size: body_size, last_modified: chrono::Utc::now(), }); upload.parts.sort_by_key(|part| part.part_number); self.metadata .save_multipart_upload(&upload) .await .map_err(Self::to_status)?; drop(_guard); self.drop_multipart_lock_if_idle(upload.upload_id.as_str()); Ok(Response::new(UploadPartResponse { etag: etag.as_str().to_string(), })) } async fn complete_multipart_upload( &self, request: Request, ) -> Result, Status> { let tenant = get_tenant_context(&request)?; let req = request.into_inner(); let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?; self.authorize_object_action(&tenant, ACTION_OBJECTS_UPDATE, &bucket, &req.key) .await?; let upload_lock = self.multipart_lock(&req.upload_id); let _guard = upload_lock.lock().await; let mut upload = self .metadata .load_multipart_upload(&req.upload_id) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found("multipart upload not found"))?; if upload.bucket_id != bucket.id.to_string() || upload.key.as_str() != req.key { return Err(Status::failed_precondition( "multipart upload does not match bucket/key", )); } let mut completed_parts: Vec = if req.parts.is_empty() { upload .parts .iter() .map(|part| CompletedPart { part_number: part.part_number.as_u32(), etag: part.etag.as_str().to_string(), }) .collect() } else { req.parts }; completed_parts.sort_by_key(|part| part.part_number); let mut selected_parts = Vec::with_capacity(completed_parts.len()); for completed in &completed_parts { let expected_number = PartNumber::new(completed.part_number) .map_err(|e| Status::invalid_argument(e.to_string()))?; let part = upload .parts .iter() .find(|part| part.part_number == expected_number) .ok_or_else(|| Status::failed_precondition("multipart part is missing"))?; if part.etag.as_str() != completed.etag { return Err(Status::failed_precondition("multipart part etag mismatch")); } selected_parts.push(part.clone()); } let etags: Vec = selected_parts .iter() .map(|part| part.etag.clone()) .collect(); let multipart_etag = ETag::multipart(&etags, selected_parts.len()); upload.parts = selected_parts; let mut object = Object::new( bucket.id.to_string(), upload.key.clone(), multipart_etag.clone(), upload.parts.iter().map(|part| part.size).sum(), upload.metadata.content_type.clone(), ); object.metadata = upload.metadata.clone(); if bucket.versioning == lightningstor_types::Versioning::Enabled { object.version = ObjectVersion::new(); } self.metadata .save_object_multipart_upload(&object.id, &upload) .await .map_err(Self::to_status)?; self.metadata .save_object(&object) .await .map_err(Self::to_status)?; self.metadata .delete_multipart_upload(upload.upload_id.as_str()) .await .map_err(Self::to_status)?; drop(_guard); self.drop_multipart_lock_if_idle(&req.upload_id); Ok(Response::new(CompleteMultipartUploadResponse { bucket: req.bucket, key: req.key, etag: multipart_etag.as_str().to_string(), version_id: object.version.as_str().to_string(), })) } async fn abort_multipart_upload( &self, request: Request, ) -> Result, Status> { let tenant = get_tenant_context(&request)?; let req = request.into_inner(); let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?; self.authorize_object_action(&tenant, ACTION_OBJECTS_DELETE, &bucket, &req.key) .await?; let upload_lock = self.multipart_lock(&req.upload_id); let _guard = upload_lock.lock().await; if let Some(upload) = self .metadata .load_multipart_upload(&req.upload_id) .await .map_err(Self::to_status)? { if upload.bucket_id != bucket.id.to_string() || upload.key.as_str() != req.key { return Err(Status::failed_precondition( "multipart upload does not match bucket/key", )); } self.delete_multipart_parts(&upload).await?; self.metadata .delete_multipart_upload(upload.upload_id.as_str()) .await .map_err(Self::to_status)?; } drop(_guard); self.drop_multipart_lock_if_idle(&req.upload_id); Ok(Response::new(())) } async fn list_parts( &self, request: Request, ) -> Result, Status> { let tenant = get_tenant_context(&request)?; let req = request.into_inner(); let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?; self.authorize_object_action(&tenant, ACTION_OBJECTS_READ, &bucket, &req.key) .await?; let upload = self .metadata .load_multipart_upload(&req.upload_id) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found("multipart upload not found"))?; if upload.bucket_id != bucket.id.to_string() || upload.key.as_str() != req.key { return Err(Status::failed_precondition( "multipart upload does not match bucket/key", )); } let max_parts = if req.max_parts > 0 { req.max_parts } else { 1000 }; let remaining_count = upload .parts .iter() .filter(|part| part.part_number.as_u32() > req.part_number_marker) .count(); let parts = upload .parts .iter() .filter(|part| part.part_number.as_u32() > req.part_number_marker) .take(max_parts as usize) .map(|part| PartInfo { part_number: part.part_number.as_u32(), etag: part.etag.as_str().to_string(), size: part.size, last_modified: Some(prost_types::Timestamp { seconds: part.last_modified.timestamp(), nanos: part.last_modified.timestamp_subsec_nanos() as i32, }), }) .collect::>(); let is_truncated = remaining_count > parts.len(); let next_part_number_marker = parts.last().map(|part| part.part_number).unwrap_or(0); Ok(Response::new(ListPartsResponse { bucket: req.bucket, key: req.key, upload_id: req.upload_id, parts, is_truncated, next_part_number_marker, })) } async fn list_multipart_uploads( &self, request: Request, ) -> Result, Status> { let tenant = get_tenant_context(&request)?; let req = request.into_inner(); let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?; self.authorize_object_action(&tenant, ACTION_OBJECTS_LIST, &bucket, &req.prefix) .await?; let bucket_id: BucketId = BucketId::from_str(&bucket.id.to_string()) .map_err(|_| Status::internal("Invalid bucket ID"))?; let max_uploads = if req.max_uploads > 0 { req.max_uploads } else { 1000 }; let uploads = self .metadata .list_multipart_uploads(&bucket_id, &req.prefix, max_uploads) .await .map_err(Self::to_status)?; Ok(Response::new(ListMultipartUploadsResponse { bucket: req.bucket, uploads: uploads .into_iter() .map(|upload| MultipartUploadInfo { key: upload.key.as_str().to_string(), upload_id: upload.upload_id.as_str().to_string(), initiated: Some(prost_types::Timestamp { seconds: upload.initiated.timestamp(), nanos: upload.initiated.timestamp_subsec_nanos() as i32, }), }) .collect(), common_prefixes: Vec::new(), is_truncated: false, next_key_marker: String::new(), next_upload_id_marker: String::new(), })) } }