photoncloud-monorepo/lightningstor/crates/lightningstor-server/src/object_service.rs

1239 lines
44 KiB
Rust

//! ObjectService gRPC implementation
use crate::metadata::MetadataStore;
use bytes::{Bytes, BytesMut};
use dashmap::DashMap;
use futures::stream;
use iam_service_auth::{get_tenant_context, resource_for_tenant, AuthService, TenantContext};
use lightningstor_api::proto::{
AbortMultipartUploadRequest, CompleteMultipartUploadRequest, CompleteMultipartUploadResponse,
CompletedPart, CopyObjectRequest, CopyObjectResponse, CreateMultipartUploadRequest,
CreateMultipartUploadResponse, DeleteObjectRequest, DeleteObjectResponse, GetObjectRequest,
GetObjectResponse, HeadObjectRequest, HeadObjectResponse, ListMultipartUploadsRequest,
ListMultipartUploadsResponse, ListObjectVersionsRequest, ListObjectVersionsResponse,
ListObjectsRequest, ListObjectsResponse, ListPartsRequest, ListPartsResponse,
MultipartUploadInfo, ObjectInfo, ObjectMetadata as ProtoObjectMetadata, PartInfo,
PutObjectRequest, PutObjectResponse, UploadPartRequest, UploadPartResponse,
};
use lightningstor_api::ObjectService;
use lightningstor_storage::StorageBackend;
use lightningstor_types::{
Bucket, BucketId, ETag, MultipartUpload, Object, ObjectKey, ObjectMetadata, ObjectVersion,
Part, PartNumber, Result as LightningStorResult,
};
use md5::{Digest, Md5};
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::{Request, Response, Status, Streaming};
const OBJECT_STREAM_CHUNK_SIZE: usize = 8 * 1024 * 1024;
/// ObjectService implementation
pub struct ObjectServiceImpl {
/// Storage backend for object data
storage: Arc<dyn StorageBackend>,
/// Metadata store for object metadata
metadata: Arc<MetadataStore>,
auth: Arc<AuthService>,
multipart_locks: Arc<DashMap<String, Arc<Mutex<()>>>>,
}
enum Entry<'a> {
Object(&'a Object),
Prefix(&'a str),
}
impl ObjectServiceImpl {
/// Create a new ObjectService
pub async fn new(
storage: Arc<dyn StorageBackend>,
metadata: Arc<MetadataStore>,
auth: Arc<AuthService>,
) -> LightningStorResult<Self> {
Ok(Self {
storage,
metadata,
auth,
multipart_locks: Arc::new(DashMap::new()),
})
}
/// Convert LightningStor Error to gRPC Status
fn to_status(err: lightningstor_types::Error) -> Status {
Status::internal(err.to_string())
}
/// Convert Object to ObjectInfo proto
fn object_to_proto(&self, obj: &Object) -> ObjectInfo {
ObjectInfo {
key: obj.key.as_str().to_string(),
etag: obj.etag.as_str().to_string(),
size: obj.size,
last_modified: Some(prost_types::Timestamp {
seconds: obj.last_modified.timestamp(),
nanos: obj.last_modified.timestamp_subsec_nanos() as i32,
}),
storage_class: obj.storage_class.clone(),
version_id: obj.version.as_str().to_string(),
is_latest: obj.is_latest,
metadata: Some(ProtoObjectMetadata {
content_type: obj.metadata.content_type.clone().unwrap_or_default(),
content_encoding: obj.metadata.content_encoding.clone().unwrap_or_default(),
content_disposition: obj.metadata.content_disposition.clone().unwrap_or_default(),
content_language: obj.metadata.content_language.clone().unwrap_or_default(),
cache_control: obj.metadata.cache_control.clone().unwrap_or_default(),
user_metadata: obj.metadata.user_metadata.clone(),
}),
}
}
/// Calculate MD5 hash of data
fn calculate_md5(data: &[u8]) -> ETag {
let mut hasher = Md5::new();
hasher.update(data);
let hash = hasher.finalize();
let hash_array: [u8; 16] = hash.into();
ETag::from_md5(&hash_array)
}
fn proto_metadata_to_object_metadata(metadata: Option<ProtoObjectMetadata>) -> ObjectMetadata {
if let Some(proto_meta) = metadata {
ObjectMetadata {
content_type: if proto_meta.content_type.is_empty() {
None
} else {
Some(proto_meta.content_type)
},
content_encoding: if proto_meta.content_encoding.is_empty() {
None
} else {
Some(proto_meta.content_encoding)
},
content_disposition: if proto_meta.content_disposition.is_empty() {
None
} else {
Some(proto_meta.content_disposition)
},
content_language: if proto_meta.content_language.is_empty() {
None
} else {
Some(proto_meta.content_language)
},
cache_control: if proto_meta.cache_control.is_empty() {
None
} else {
Some(proto_meta.cache_control)
},
user_metadata: proto_meta.user_metadata,
}
} else {
ObjectMetadata::default()
}
}
fn resolve_range(total_len: usize, start: i64, end: i64) -> (usize, usize) {
if start == 0 && end == 0 {
return (0, total_len);
}
if start >= 0 && end >= 0 {
let range_start = (start as usize).min(total_len);
let range_end = if end >= start {
(end as usize).min(total_len)
} else {
total_len
};
return (range_start, range_end);
}
(0, total_len)
}
async fn delete_multipart_parts(&self, upload: &MultipartUpload) -> Result<(), Status> {
for part in &upload.parts {
self.storage
.delete_part(upload.upload_id.as_str(), part.part_number.as_u32())
.await
.map_err(|e| Status::internal(format!("Failed to delete multipart part: {}", e)))?;
}
self.storage
.delete_upload_parts(upload.upload_id.as_str())
.await
.map_err(|e| Status::internal(format!("Failed to clean multipart upload: {}", e)))?;
Ok(())
}
fn multipart_object_stream(
&self,
object: &Object,
upload: MultipartUpload,
start: usize,
end: usize,
) -> <Self as ObjectService>::GetObjectStream {
let storage = self.storage.clone();
let state = (
storage,
upload,
Some(self.object_to_proto(object)),
0usize,
0u64,
);
let range_start = start as u64;
let range_end = end as u64;
let object_size = object.size;
Box::pin(stream::try_unfold(
state,
move |(storage, upload, object_info, next_part_index, consumed)| async move {
if let Some(info) = object_info {
return Ok(Some((
GetObjectResponse {
content: Some(
lightningstor_api::proto::get_object_response::Content::Metadata(
info,
),
),
},
(storage, upload, None, next_part_index, consumed),
)));
}
if range_start >= range_end || range_start >= object_size {
return Ok(None);
}
let mut idx = next_part_index;
let mut offset = consumed;
while idx < upload.parts.len() {
let part = &upload.parts[idx];
let part_start = offset;
let part_end = part_start + part.size;
idx += 1;
offset = part_end;
if range_end <= part_start || range_start >= part_end {
continue;
}
let bytes = storage
.get_part(upload.upload_id.as_str(), part.part_number.as_u32())
.await
.map_err(|e| {
Status::internal(format!(
"Failed to retrieve multipart object part: {}",
e
))
})?;
let body_start = range_start.saturating_sub(part_start) as usize;
let body_end = (range_end.min(part_end) - part_start) as usize;
if body_start > bytes.len() || body_end > bytes.len() || body_start > body_end {
return Err(Status::internal(format!(
"Multipart part {} for upload {} is inconsistent: stored={} requested={}..{}",
part.part_number.as_u32(),
upload.upload_id.as_str(),
bytes.len(),
body_start,
body_end
)));
}
return Ok(Some((
GetObjectResponse {
content: Some(
lightningstor_api::proto::get_object_response::Content::BodyChunk(
bytes.slice(body_start..body_end),
),
),
},
(storage, upload, None, idx, offset),
)));
}
Ok(None)
},
))
}
fn object_stream_from_bytes(
&self,
object: &Object,
data: Bytes,
start: usize,
end: usize,
) -> <Self as ObjectService>::GetObjectStream {
let range_start = start.min(data.len());
let range_end = end.min(data.len());
let state = (
data,
Some(self.object_to_proto(object)),
range_start,
range_end,
OBJECT_STREAM_CHUNK_SIZE,
);
Box::pin(stream::try_unfold(
state,
move |(data, object_info, next_offset, range_end, chunk_size)| async move {
if let Some(info) = object_info {
return Ok(Some((
GetObjectResponse {
content: Some(
lightningstor_api::proto::get_object_response::Content::Metadata(
info,
),
),
},
(data, None, next_offset, range_end, chunk_size),
)));
}
if next_offset >= range_end {
return Ok(None);
}
let chunk_end = (next_offset + chunk_size).min(range_end);
Ok(Some((
GetObjectResponse {
content: Some(
lightningstor_api::proto::get_object_response::Content::BodyChunk(
data.slice(next_offset..chunk_end),
),
),
},
(data, None, chunk_end, range_end, chunk_size),
)))
},
))
}
async fn load_bucket_for_tenant(
&self,
tenant: &TenantContext,
bucket_name: &str,
) -> Result<Bucket, Status> {
self.metadata
.load_bucket(&tenant.org_id, &tenant.project_id, bucket_name)
.await
.map_err(Self::to_status)?
.ok_or_else(|| Status::not_found(format!("Bucket {} not found", bucket_name)))
}
async fn authorize_object_action(
&self,
tenant: &TenantContext,
action: &str,
bucket: &Bucket,
object_id: &str,
) -> Result<(), Status> {
let resource_id = format!("{}/{}", bucket.id, object_id);
self.auth
.authorize(
tenant,
action,
&resource_for_tenant("object", resource_id, &bucket.org_id, &bucket.project_id),
)
.await
}
async fn load_full_object_bytes(&self, object: &Object) -> Result<Bytes, Status> {
if let Some(upload) = self
.metadata
.load_object_multipart_upload(&object.id)
.await
.map_err(Self::to_status)?
{
let mut body = BytesMut::new();
for part in &upload.parts {
let bytes = self
.storage
.get_part(upload.upload_id.as_str(), part.part_number.as_u32())
.await
.map_err(|e| {
Status::internal(format!("Failed to retrieve multipart object part: {}", e))
})?;
body.extend_from_slice(bytes.as_ref());
}
if body.len() as u64 != object.size {
return Err(Status::internal(format!(
"Multipart object {} has inconsistent size: expected {}, got {}",
object.id,
object.size,
body.len()
)));
}
Ok(body.freeze())
} else {
self.storage
.get_object(&object.id)
.await
.map_err(|e| Status::internal(format!("Failed to retrieve object: {}", e)))
}
}
fn multipart_lock(&self, upload_id: &str) -> Arc<Mutex<()>> {
self.multipart_locks
.entry(upload_id.to_string())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone()
}
fn drop_multipart_lock_if_idle(&self, upload_id: &str) {
if let Some(entry) = self.multipart_locks.get(upload_id) {
if Arc::strong_count(entry.value()) == 2 {
drop(entry);
self.multipart_locks.remove(upload_id);
}
}
}
}
const ACTION_OBJECTS_CREATE: &str = "storage:objects:create";
const ACTION_OBJECTS_READ: &str = "storage:objects:read";
const ACTION_OBJECTS_UPDATE: &str = "storage:objects:update";
const ACTION_OBJECTS_DELETE: &str = "storage:objects:delete";
const ACTION_OBJECTS_LIST: &str = "storage:objects:list";
#[tonic::async_trait]
impl ObjectService for ObjectServiceImpl {
type GetObjectStream = std::pin::Pin<
Box<dyn tokio_stream::Stream<Item = Result<GetObjectResponse, Status>> + Send>,
>;
async fn put_object(
&self,
request: Request<PutObjectRequest>,
) -> Result<Response<PutObjectResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
let body = req.body;
let body_size = body.len() as u64;
tracing::debug!(
bucket = %req.bucket,
key = %req.key,
size = body_size,
"PutObject request"
);
let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?;
self.authorize_object_action(&tenant, ACTION_OBJECTS_CREATE, &bucket, &req.key)
.await?;
// Validate object key
let object_key = ObjectKey::new(&req.key)
.map_err(|e| Status::invalid_argument(format!("Invalid object key: {}", e)))?;
// Calculate ETag
let etag = Self::calculate_md5(&body);
// Create object metadata
let metadata = Self::proto_metadata_to_object_metadata(req.metadata);
// Create object
let mut object = Object::new(
bucket.id.to_string(),
object_key.clone(),
etag.clone(),
body_size,
metadata.content_type.clone(),
);
object.metadata = metadata;
// Handle versioning
if bucket.versioning == lightningstor_types::Versioning::Enabled {
object.version = ObjectVersion::new();
}
// Save object data to storage backend
self.storage
.put_object(&object.id, body)
.await
.map_err(|e| Status::internal(format!("Failed to store object: {}", e)))?;
// Save object metadata
self.metadata
.save_object(&object)
.await
.map_err(Self::to_status)?;
tracing::debug!(
bucket = %req.bucket,
key = %req.key,
object_id = %object.id,
etag = %etag.as_str(),
"Object stored successfully"
);
Ok(Response::new(PutObjectResponse {
etag: etag.as_str().to_string(),
version_id: object.version.as_str().to_string(),
}))
}
async fn get_object(
&self,
request: Request<GetObjectRequest>,
) -> Result<Response<Self::GetObjectStream>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
tracing::debug!(
bucket = %req.bucket,
key = %req.key,
"GetObject request"
);
let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?;
self.authorize_object_action(&tenant, ACTION_OBJECTS_READ, &bucket, &req.key)
.await?;
let bucket_id: BucketId = BucketId::from_str(&bucket.id.to_string())
.map_err(|_| Status::internal("Invalid bucket ID"))?;
// Load object metadata
let version_id = if req.version_id.is_empty() {
None
} else {
Some(req.version_id.as_str())
};
let object = self
.metadata
.load_object(&bucket_id, &req.key, version_id)
.await
.map_err(Self::to_status)?
.ok_or_else(|| Status::not_found(format!("Object {} not found", req.key)))?;
// Check if delete marker
if object.is_delete_marker {
return Err(Status::not_found("Object is a delete marker"));
}
let (start, end) =
Self::resolve_range(object.size as usize, req.range_start, req.range_end);
if let Some(upload) = self
.metadata
.load_object_multipart_upload(&object.id)
.await
.map_err(Self::to_status)?
{
return Ok(Response::new(
self.multipart_object_stream(&object, upload, start, end),
));
}
let data = self
.storage
.get_object(&object.id)
.await
.map_err(|e| Status::internal(format!("Failed to retrieve object: {}", e)))?;
Ok(Response::new(
self.object_stream_from_bytes(&object, data, start, end),
))
}
async fn delete_object(
&self,
request: Request<DeleteObjectRequest>,
) -> Result<Response<DeleteObjectResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
tracing::debug!(
bucket = %req.bucket,
key = %req.key,
"DeleteObject request"
);
let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?;
self.authorize_object_action(&tenant, ACTION_OBJECTS_DELETE, &bucket, &req.key)
.await?;
let bucket_id: BucketId = BucketId::from_str(&bucket.id.to_string())
.map_err(|_| Status::internal("Invalid bucket ID"))?;
// Parse version ID
let version_id = if req.version_id.is_empty() {
None
} else {
Some(req.version_id.as_str())
};
// Load object to get its storage ID
let object = self
.metadata
.load_object(&bucket_id, &req.key, version_id)
.await
.map_err(Self::to_status)?
.ok_or_else(|| Status::not_found(format!("Object {} not found", req.key)))?;
if let Some(upload) = self
.metadata
.load_object_multipart_upload(&object.id)
.await
.map_err(Self::to_status)?
{
self.delete_multipart_parts(&upload).await?;
self.metadata
.delete_object_multipart_upload(&object.id)
.await
.map_err(Self::to_status)?;
self.metadata
.delete_multipart_upload(upload.upload_id.as_str())
.await
.map_err(Self::to_status)?;
} else {
self.storage
.delete_object(&object.id)
.await
.map_err(|e| Status::internal(format!("Failed to delete object data: {}", e)))?;
}
// Delete from metadata store
self.metadata
.delete_object(&bucket_id, &req.key, version_id)
.await
.map_err(Self::to_status)?;
tracing::debug!(
bucket = %req.bucket,
key = %req.key,
"Object deleted successfully"
);
Ok(Response::new(DeleteObjectResponse {
version_id: object.version.as_str().to_string(),
delete_marker: object.is_delete_marker,
}))
}
async fn head_object(
&self,
request: Request<HeadObjectRequest>,
) -> Result<Response<HeadObjectResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
tracing::debug!(
bucket = %req.bucket,
key = %req.key,
"HeadObject request"
);
let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?;
self.authorize_object_action(&tenant, ACTION_OBJECTS_READ, &bucket, &req.key)
.await?;
let bucket_id: BucketId = BucketId::from_str(&bucket.id.to_string())
.map_err(|_| Status::internal("Invalid bucket ID"))?;
// Parse version ID
let version_id = if req.version_id.is_empty() {
None
} else {
Some(req.version_id.as_str())
};
// Load object metadata
let object = self
.metadata
.load_object(&bucket_id, &req.key, version_id)
.await
.map_err(Self::to_status)?
.ok_or_else(|| Status::not_found(format!("Object {} not found", req.key)))?;
// Check if delete marker
if object.is_delete_marker {
return Err(Status::not_found("Object is a delete marker"));
}
Ok(Response::new(HeadObjectResponse {
object: Some(self.object_to_proto(&object)),
}))
}
async fn copy_object(
&self,
request: Request<CopyObjectRequest>,
) -> Result<Response<CopyObjectResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
if req.source_bucket.is_empty() {
return Err(Status::invalid_argument("source_bucket is required"));
}
if req.source_key.is_empty() {
return Err(Status::invalid_argument("source_key is required"));
}
if req.dest_bucket.is_empty() {
return Err(Status::invalid_argument("dest_bucket is required"));
}
if req.dest_key.is_empty() {
return Err(Status::invalid_argument("dest_key is required"));
}
let source_bucket = self
.load_bucket_for_tenant(&tenant, &req.source_bucket)
.await?;
self.authorize_object_action(
&tenant,
ACTION_OBJECTS_READ,
&source_bucket,
&req.source_key,
)
.await?;
let dest_bucket = self
.load_bucket_for_tenant(&tenant, &req.dest_bucket)
.await?;
self.authorize_object_action(&tenant, ACTION_OBJECTS_CREATE, &dest_bucket, &req.dest_key)
.await?;
let source_bucket_id: BucketId = BucketId::from_str(&source_bucket.id.to_string())
.map_err(|_| Status::internal("Invalid source bucket ID"))?;
let dest_key = ObjectKey::new(&req.dest_key)
.map_err(|e| Status::invalid_argument(format!("Invalid destination key: {}", e)))?;
let source_version_id = if req.source_version_id.is_empty() {
None
} else {
Some(req.source_version_id.as_str())
};
let source_object = self
.metadata
.load_object(&source_bucket_id, &req.source_key, source_version_id)
.await
.map_err(Self::to_status)?
.ok_or_else(|| Status::not_found(format!("Object {} not found", req.source_key)))?;
if source_object.is_delete_marker {
return Err(Status::not_found("Source object is a delete marker"));
}
let data = self.load_full_object_bytes(&source_object).await?;
let object_metadata = if req.metadata_directive_replace {
Self::proto_metadata_to_object_metadata(req.metadata)
} else {
source_object.metadata.clone()
};
let mut dest_object = Object::new(
dest_bucket.id.to_string(),
dest_key,
source_object.etag.clone(),
source_object.size,
object_metadata.content_type.clone(),
);
dest_object.metadata = object_metadata;
dest_object.storage_class = source_object.storage_class.clone();
if dest_bucket.versioning == lightningstor_types::Versioning::Enabled {
dest_object.version = ObjectVersion::new();
}
self.storage
.put_object(&dest_object.id, data)
.await
.map_err(|e| Status::internal(format!("Failed to store copied object: {}", e)))?;
self.metadata
.save_object(&dest_object)
.await
.map_err(Self::to_status)?;
Ok(Response::new(CopyObjectResponse {
etag: dest_object.etag.as_str().to_string(),
version_id: dest_object.version.as_str().to_string(),
last_modified: Some(prost_types::Timestamp {
seconds: dest_object.last_modified.timestamp(),
nanos: dest_object.last_modified.timestamp_subsec_nanos() as i32,
}),
}))
}
async fn list_objects(
&self,
request: Request<ListObjectsRequest>,
) -> Result<Response<ListObjectsResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
tracing::info!(
bucket = %req.bucket,
prefix = %req.prefix,
max_keys = req.max_keys,
"ListObjects request"
);
let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?;
self.authorize_object_action(&tenant, ACTION_OBJECTS_LIST, &bucket, &req.prefix)
.await?;
let bucket_id: BucketId = BucketId::from_str(&bucket.id.to_string())
.map_err(|_| Status::internal("Invalid bucket ID"))?;
// Default max_keys to 1000 if not specified
let max_keys = if req.max_keys > 0 { req.max_keys } else { 1000 };
let start_after = if !req.continuation_token.is_empty() {
req.continuation_token.as_str()
} else if !req.start_after.is_empty() {
req.start_after.as_str()
} else {
""
};
// List objects from metadata store (fetch all to apply delimiter/pagination locally)
let mut objects = self
.metadata
.list_objects(&bucket_id, &req.prefix, 0)
.await
.map_err(Self::to_status)?;
// Filter delete markers and apply start_after
objects.retain(|obj| !obj.is_delete_marker);
if !start_after.is_empty() {
objects.retain(|obj| obj.key.as_str() > start_after);
}
// Ensure stable ordering
objects.sort_by(|a, b| a.key.as_str().cmp(b.key.as_str()));
let delimiter = req.delimiter.as_str();
let has_delimiter = !delimiter.is_empty();
// Build entries (objects + common prefixes) in lexicographic order
let mut common_prefixes = std::collections::BTreeSet::new();
let mut object_entries = Vec::new();
if has_delimiter {
for obj in &objects {
let key = obj.key.as_str();
let relative = key.strip_prefix(req.prefix.as_str()).unwrap_or(key);
if let Some(pos) = relative.find(delimiter) {
let common_prefix = format!("{}{}{}", req.prefix, &relative[..pos], delimiter);
common_prefixes.insert(common_prefix);
} else {
object_entries.push(obj);
}
}
} else {
object_entries.extend(objects.iter());
}
let mut entries: Vec<(String, Entry<'_>)> = Vec::new();
for obj in object_entries {
entries.push((obj.key.as_str().to_string(), Entry::Object(obj)));
}
for prefix in &common_prefixes {
entries.push((prefix.clone(), Entry::Prefix(prefix.as_str())));
}
entries.sort_by(|a, b| a.0.cmp(&b.0));
let is_truncated = entries.len() > max_keys as usize;
let limited_entries = entries.into_iter().take(max_keys as usize);
let mut object_infos = Vec::new();
let mut prefixes = Vec::new();
let mut last_key = None;
for (key, entry) in limited_entries {
last_key = Some(key);
match entry {
Entry::Object(obj) => object_infos.push(self.object_to_proto(obj)),
Entry::Prefix(prefix) => prefixes.push(prefix.to_string()),
}
}
let next_continuation_token = if is_truncated {
last_key.unwrap_or_default()
} else {
String::new()
};
Ok(Response::new(ListObjectsResponse {
key_count: (object_infos.len() + prefixes.len()) as u32,
objects: object_infos,
common_prefixes: prefixes,
is_truncated,
next_continuation_token,
}))
}
async fn list_object_versions(
&self,
_request: Request<ListObjectVersionsRequest>,
) -> Result<Response<ListObjectVersionsResponse>, Status> {
Err(Status::unimplemented(
"ListObjectVersions not yet implemented",
))
}
async fn create_multipart_upload(
&self,
request: Request<CreateMultipartUploadRequest>,
) -> Result<Response<CreateMultipartUploadResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?;
self.authorize_object_action(&tenant, ACTION_OBJECTS_CREATE, &bucket, &req.key)
.await?;
let object_key = ObjectKey::new(&req.key)
.map_err(|e| Status::invalid_argument(format!("Invalid object key: {}", e)))?;
let mut upload = MultipartUpload::new(bucket.id.to_string(), object_key);
upload.metadata = Self::proto_metadata_to_object_metadata(req.metadata);
self.metadata
.save_multipart_upload(&upload)
.await
.map_err(Self::to_status)?;
Ok(Response::new(CreateMultipartUploadResponse {
bucket: req.bucket,
key: req.key,
upload_id: upload.upload_id.as_str().to_string(),
}))
}
async fn upload_part(
&self,
request: Request<Streaming<UploadPartRequest>>,
) -> Result<Response<UploadPartResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let mut stream = request.into_inner();
let first = stream
.message()
.await?
.ok_or_else(|| Status::invalid_argument("UploadPart stream is empty"))?;
let bucket = self.load_bucket_for_tenant(&tenant, &first.bucket).await?;
self.authorize_object_action(&tenant, ACTION_OBJECTS_UPDATE, &bucket, &first.key)
.await?;
let upload_lock = self.multipart_lock(&first.upload_id);
{
let _guard = upload_lock.lock().await;
let upload = self
.metadata
.load_multipart_upload(&first.upload_id)
.await
.map_err(Self::to_status)?
.ok_or_else(|| Status::not_found("multipart upload not found"))?;
if upload.bucket_id != bucket.id.to_string() || upload.key.as_str() != first.key {
return Err(Status::failed_precondition(
"multipart upload does not match bucket/key",
));
}
}
let part_number = PartNumber::new(first.part_number)
.map_err(|e| Status::invalid_argument(e.to_string()))?;
let mut body = BytesMut::from(first.body.as_ref());
let declared_md5 = first.content_md5;
while let Some(chunk) = stream.message().await? {
if !chunk.bucket.is_empty() && chunk.bucket != first.bucket {
return Err(Status::invalid_argument(
"bucket changed within UploadPart stream",
));
}
if !chunk.key.is_empty() && chunk.key != first.key {
return Err(Status::invalid_argument(
"key changed within UploadPart stream",
));
}
if !chunk.upload_id.is_empty() && chunk.upload_id != first.upload_id {
return Err(Status::invalid_argument(
"upload_id changed within UploadPart stream",
));
}
if chunk.part_number != 0 && chunk.part_number != first.part_number {
return Err(Status::invalid_argument(
"part_number changed within UploadPart stream",
));
}
body.extend_from_slice(chunk.body.as_ref());
}
let etag = Self::calculate_md5(&body);
if !declared_md5.is_empty() && declared_md5 != etag.as_str() {
return Err(Status::invalid_argument("content_md5 mismatch"));
}
let body = body.freeze();
let body_size = body.len() as u64;
self.storage
.put_part(first.upload_id.as_str(), part_number.as_u32(), body)
.await
.map_err(|e| Status::internal(format!("Failed to store multipart part: {}", e)))?;
let _guard = upload_lock.lock().await;
let mut upload = self
.metadata
.load_multipart_upload(&first.upload_id)
.await
.map_err(Self::to_status)?
.ok_or_else(|| Status::not_found("multipart upload not found"))?;
if upload.bucket_id != bucket.id.to_string() || upload.key.as_str() != first.key {
let _ = self
.storage
.delete_part(first.upload_id.as_str(), part_number.as_u32())
.await;
return Err(Status::failed_precondition(
"multipart upload does not match bucket/key",
));
}
upload.parts.retain(|part| part.part_number != part_number);
upload.parts.push(Part {
part_number,
etag: etag.clone(),
size: body_size,
last_modified: chrono::Utc::now(),
});
upload.parts.sort_by_key(|part| part.part_number);
self.metadata
.save_multipart_upload(&upload)
.await
.map_err(Self::to_status)?;
drop(_guard);
self.drop_multipart_lock_if_idle(upload.upload_id.as_str());
Ok(Response::new(UploadPartResponse {
etag: etag.as_str().to_string(),
}))
}
async fn complete_multipart_upload(
&self,
request: Request<CompleteMultipartUploadRequest>,
) -> Result<Response<CompleteMultipartUploadResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?;
self.authorize_object_action(&tenant, ACTION_OBJECTS_UPDATE, &bucket, &req.key)
.await?;
let upload_lock = self.multipart_lock(&req.upload_id);
let _guard = upload_lock.lock().await;
let mut upload = self
.metadata
.load_multipart_upload(&req.upload_id)
.await
.map_err(Self::to_status)?
.ok_or_else(|| Status::not_found("multipart upload not found"))?;
if upload.bucket_id != bucket.id.to_string() || upload.key.as_str() != req.key {
return Err(Status::failed_precondition(
"multipart upload does not match bucket/key",
));
}
let mut completed_parts: Vec<CompletedPart> = if req.parts.is_empty() {
upload
.parts
.iter()
.map(|part| CompletedPart {
part_number: part.part_number.as_u32(),
etag: part.etag.as_str().to_string(),
})
.collect()
} else {
req.parts
};
completed_parts.sort_by_key(|part| part.part_number);
let mut selected_parts = Vec::with_capacity(completed_parts.len());
for completed in &completed_parts {
let expected_number = PartNumber::new(completed.part_number)
.map_err(|e| Status::invalid_argument(e.to_string()))?;
let part = upload
.parts
.iter()
.find(|part| part.part_number == expected_number)
.ok_or_else(|| Status::failed_precondition("multipart part is missing"))?;
if part.etag.as_str() != completed.etag {
return Err(Status::failed_precondition("multipart part etag mismatch"));
}
selected_parts.push(part.clone());
}
let etags: Vec<ETag> = selected_parts
.iter()
.map(|part| part.etag.clone())
.collect();
let multipart_etag = ETag::multipart(&etags, selected_parts.len());
upload.parts = selected_parts;
let mut object = Object::new(
bucket.id.to_string(),
upload.key.clone(),
multipart_etag.clone(),
upload.parts.iter().map(|part| part.size).sum(),
upload.metadata.content_type.clone(),
);
object.metadata = upload.metadata.clone();
if bucket.versioning == lightningstor_types::Versioning::Enabled {
object.version = ObjectVersion::new();
}
self.metadata
.save_object_multipart_upload(&object.id, &upload)
.await
.map_err(Self::to_status)?;
self.metadata
.save_object(&object)
.await
.map_err(Self::to_status)?;
self.metadata
.delete_multipart_upload(upload.upload_id.as_str())
.await
.map_err(Self::to_status)?;
drop(_guard);
self.drop_multipart_lock_if_idle(&req.upload_id);
Ok(Response::new(CompleteMultipartUploadResponse {
bucket: req.bucket,
key: req.key,
etag: multipart_etag.as_str().to_string(),
version_id: object.version.as_str().to_string(),
}))
}
async fn abort_multipart_upload(
&self,
request: Request<AbortMultipartUploadRequest>,
) -> Result<Response<()>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?;
self.authorize_object_action(&tenant, ACTION_OBJECTS_DELETE, &bucket, &req.key)
.await?;
let upload_lock = self.multipart_lock(&req.upload_id);
let _guard = upload_lock.lock().await;
if let Some(upload) = self
.metadata
.load_multipart_upload(&req.upload_id)
.await
.map_err(Self::to_status)?
{
if upload.bucket_id != bucket.id.to_string() || upload.key.as_str() != req.key {
return Err(Status::failed_precondition(
"multipart upload does not match bucket/key",
));
}
self.delete_multipart_parts(&upload).await?;
self.metadata
.delete_multipart_upload(upload.upload_id.as_str())
.await
.map_err(Self::to_status)?;
}
drop(_guard);
self.drop_multipart_lock_if_idle(&req.upload_id);
Ok(Response::new(()))
}
async fn list_parts(
&self,
request: Request<ListPartsRequest>,
) -> Result<Response<ListPartsResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?;
self.authorize_object_action(&tenant, ACTION_OBJECTS_READ, &bucket, &req.key)
.await?;
let upload = self
.metadata
.load_multipart_upload(&req.upload_id)
.await
.map_err(Self::to_status)?
.ok_or_else(|| Status::not_found("multipart upload not found"))?;
if upload.bucket_id != bucket.id.to_string() || upload.key.as_str() != req.key {
return Err(Status::failed_precondition(
"multipart upload does not match bucket/key",
));
}
let max_parts = if req.max_parts > 0 {
req.max_parts
} else {
1000
};
let remaining_count = upload
.parts
.iter()
.filter(|part| part.part_number.as_u32() > req.part_number_marker)
.count();
let parts = upload
.parts
.iter()
.filter(|part| part.part_number.as_u32() > req.part_number_marker)
.take(max_parts as usize)
.map(|part| PartInfo {
part_number: part.part_number.as_u32(),
etag: part.etag.as_str().to_string(),
size: part.size,
last_modified: Some(prost_types::Timestamp {
seconds: part.last_modified.timestamp(),
nanos: part.last_modified.timestamp_subsec_nanos() as i32,
}),
})
.collect::<Vec<_>>();
let is_truncated = remaining_count > parts.len();
let next_part_number_marker = parts.last().map(|part| part.part_number).unwrap_or(0);
Ok(Response::new(ListPartsResponse {
bucket: req.bucket,
key: req.key,
upload_id: req.upload_id,
parts,
is_truncated,
next_part_number_marker,
}))
}
async fn list_multipart_uploads(
&self,
request: Request<ListMultipartUploadsRequest>,
) -> Result<Response<ListMultipartUploadsResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
let bucket = self.load_bucket_for_tenant(&tenant, &req.bucket).await?;
self.authorize_object_action(&tenant, ACTION_OBJECTS_LIST, &bucket, &req.prefix)
.await?;
let bucket_id: BucketId = BucketId::from_str(&bucket.id.to_string())
.map_err(|_| Status::internal("Invalid bucket ID"))?;
let max_uploads = if req.max_uploads > 0 {
req.max_uploads
} else {
1000
};
let uploads = self
.metadata
.list_multipart_uploads(&bucket_id, &req.prefix, max_uploads)
.await
.map_err(Self::to_status)?;
Ok(Response::new(ListMultipartUploadsResponse {
bucket: req.bucket,
uploads: uploads
.into_iter()
.map(|upload| MultipartUploadInfo {
key: upload.key.as_str().to_string(),
upload_id: upload.upload_id.as_str().to_string(),
initiated: Some(prost_types::Timestamp {
seconds: upload.initiated.timestamp(),
nanos: upload.initiated.timestamp_subsec_nanos() as i32,
}),
})
.collect(),
common_prefixes: Vec::new(),
is_truncated: false,
next_key_marker: String::new(),
next_upload_id_marker: String::new(),
}))
}
}