//! BucketService gRPC implementation use crate::metadata::MetadataStore; use lightningstor_api::proto::{ BucketInfo, CreateBucketRequest, CreateBucketResponse, DeleteBucketPolicyRequest, DeleteBucketRequest, DeleteBucketTaggingRequest, GetBucketPolicyRequest, GetBucketPolicyResponse, GetBucketTaggingRequest, GetBucketTaggingResponse, GetBucketVersioningRequest, GetBucketVersioningResponse, HeadBucketRequest, HeadBucketResponse, ListBucketsRequest, ListBucketsResponse, PutBucketPolicyRequest, PutBucketTaggingRequest, PutBucketVersioningRequest, }; use lightningstor_api::BucketService; use lightningstor_types::{Bucket, BucketName, Result as LightningStorResult}; use std::sync::Arc; use tonic::{Request, Response, Status}; /// BucketService implementation pub struct BucketServiceImpl { /// Metadata store for bucket/object metadata metadata: Arc, } impl BucketServiceImpl { /// Create a new BucketService pub async fn new( // storage: Arc, // Removed metadata: Arc, ) -> LightningStorResult { Ok(Self { metadata }) } /// Convert LightningStor Error to gRPC Status fn to_status(err: lightningstor_types::Error) -> Status { Status::internal(err.to_string()) } /// Convert Bucket to BucketInfo proto fn bucket_to_proto(&self, bucket: &Bucket) -> BucketInfo { BucketInfo { name: bucket.name.as_str().to_string(), id: bucket.id.to_string(), region: bucket.region.clone(), created_at: Some(prost_types::Timestamp { seconds: bucket.created_at.timestamp(), nanos: bucket.created_at.timestamp_subsec_nanos() as i32, }), org_id: bucket.org_id.clone(), project_id: bucket.project_id.clone(), } } } #[tonic::async_trait] impl BucketService for BucketServiceImpl { async fn create_bucket( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); tracing::info!(bucket = %req.bucket, "CreateBucket request"); // Use org_id and project_id from request if provided, else default let org_id = if req.org_id.is_empty() { "default".to_string() } else { req.org_id }; let project_id = if req.project_id.is_empty() { "default".to_string() } else { req.project_id }; // Validate bucket name let bucket_name = BucketName::new(&req.bucket) .map_err(|e| Status::invalid_argument(format!("Invalid bucket name: {}", e)))?; // Check if bucket already exists if let Some(_) = self .metadata .load_bucket(&org_id, &project_id, &req.bucket) .await .map_err(Self::to_status)? { return Err(Status::already_exists(format!( "Bucket {} already exists", req.bucket ))); } // Create bucket let region = if req.region.is_empty() { "default".to_string() } else { req.region.clone() }; let bucket = Bucket::new(bucket_name, &org_id, &project_id, region); // Save bucket metadata self.metadata .save_bucket(&bucket) .await .map_err(Self::to_status)?; tracing::info!( bucket = %req.bucket, bucket_id = %bucket.id, "Bucket created successfully" ); Ok(Response::new(CreateBucketResponse { bucket: Some(self.bucket_to_proto(&bucket)), })) } async fn delete_bucket( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); tracing::info!(bucket = %req.bucket, "DeleteBucket request"); let org_id = "default"; let project_id = "default"; // Load bucket let bucket = self .metadata .load_bucket(org_id, project_id, &req.bucket) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found(format!("Bucket {} not found", req.bucket)))?; // TODO: Check if bucket is empty before deleting // Delete bucket metadata self.metadata .delete_bucket(&bucket) .await .map_err(Self::to_status)?; tracing::info!(bucket = %req.bucket, "Bucket deleted successfully"); Ok(Response::new(())) } async fn head_bucket( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); tracing::info!(bucket = %req.bucket, "HeadBucket request"); let org_id = "default"; let project_id = "default"; // Load bucket let bucket = self .metadata .load_bucket(org_id, project_id, &req.bucket) .await .map_err(Self::to_status)? .ok_or_else(|| Status::not_found(format!("Bucket {} not found", req.bucket)))?; Ok(Response::new(HeadBucketResponse { bucket: Some(self.bucket_to_proto(&bucket)), })) } async fn list_buckets( &self, _request: Request, ) -> Result, Status> { tracing::info!("ListBuckets request"); let org_id = "default"; // List all buckets for the org let buckets = self .metadata .list_buckets(org_id, None) .await .map_err(Self::to_status)?; let bucket_infos: Vec = buckets .iter() .map(|b| self.bucket_to_proto(b)) .collect(); Ok(Response::new(ListBucketsResponse { buckets: bucket_infos, is_truncated: false, next_continuation_token: String::new(), })) } async fn get_bucket_versioning( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented( "GetBucketVersioning not yet implemented", )) } async fn put_bucket_versioning( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented( "PutBucketVersioning not yet implemented", )) } async fn get_bucket_policy( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented("GetBucketPolicy not yet implemented")) } async fn put_bucket_policy( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented("PutBucketPolicy not yet implemented")) } async fn delete_bucket_policy( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented( "DeleteBucketPolicy not yet implemented", )) } async fn get_bucket_tagging( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented("GetBucketTagging not yet implemented")) } async fn put_bucket_tagging( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented("PutBucketTagging not yet implemented")) } async fn delete_bucket_tagging( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented( "DeleteBucketTagging not yet implemented", )) } }