use crate::config::ArtifactStoreConfig; use crate::storage::ImageUploadPart; use reqwest::header::LOCATION; use reqwest::{Client as HttpClient, Url}; use sha2::{Digest, Sha256}; use std::collections::HashSet; use std::net::IpAddr; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{Duration, Instant}; use dashmap::{DashMap, DashSet}; use iam_client::client::IamClientConfig; use iam_client::IamClient; use iam_types::{PolicyBinding, PrincipalRef, Resource, Scope}; use lightningstor_api::proto::{ get_object_response, AbortMultipartUploadRequest, CompleteMultipartUploadRequest, CompletedPart, CreateBucketRequest, CreateMultipartUploadRequest, DeleteObjectRequest, GetObjectRequest, PutObjectRequest, UploadPartRequest, }; use lightningstor_api::{BucketServiceClient, ObjectServiceClient}; use plasmavmc_types::ImageFormat; use serde::Deserialize; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::process::Command; use tokio::task::JoinSet; use tokio_stream::StreamExt; use tonic::metadata::MetadataValue; use tonic::transport::{Channel, Endpoint}; use tonic::{Code, Request, Status}; const MAX_OBJECT_GRPC_MESSAGE_SIZE: usize = 1024 * 1024 * 1024; const OBJECT_GRPC_INITIAL_STREAM_WINDOW: u32 = 64 * 1024 * 1024; const OBJECT_GRPC_INITIAL_CONNECTION_WINDOW: u32 = 512 * 1024 * 1024; const OBJECT_GRPC_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30); const OBJECT_GRPC_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(10); const MIN_MULTIPART_UPLOAD_PART_SIZE: usize = 8 * 1024 * 1024; const MAX_MULTIPART_UPLOAD_PART_SIZE: usize = 128 * 1024 * 1024; const MAX_MULTIPART_UPLOAD_CONCURRENCY: usize = 32; const MAX_IMPORT_REDIRECTS: usize = 5; const DEFAULT_HTTP_SEND_TIMEOUT: Duration = Duration::from_secs(15); #[derive(Clone)] pub struct ArtifactStore { channel: Channel, iam_client: Arc, http_client: HttpClient, image_bucket: String, image_cache_dir: PathBuf, multipart_upload_concurrency: usize, multipart_upload_part_size: usize, raw_image_convert_parallelism: usize, max_image_import_size_bytes: u64, allowed_https_hosts: Arc>, qemu_img_path: PathBuf, project_tokens: Arc>, ensured_buckets: Arc>, } pub(crate) struct ImportedImage { pub size_bytes: u64, pub checksum: String, pub format: ImageFormat, pub source_type: String, pub source_host: Option, } #[derive(Deserialize)] struct QemuImageInfo { format: String, } struct CachedToken { token: String, expires_at: Instant, } struct ValidatedImportUrl { url: Url, host: String, } struct ImportedImageSource { source_type: String, host: Option, } impl ArtifactStore { pub async fn from_config( config: &ArtifactStoreConfig, iam_endpoint: &str, ) -> Result, Box> { let Some(raw_endpoint) = config .lightningstor_endpoint .as_ref() .map(|value| value.trim().to_string()) .filter(|value| !value.is_empty()) else { return Ok(None); }; let endpoint = normalize_endpoint(&raw_endpoint); let iam_endpoint = normalize_endpoint(iam_endpoint); let mut iam_config = IamClientConfig::new(iam_endpoint.clone()).with_timeout(5000); if iam_endpoint.starts_with("http://") { iam_config = iam_config.without_tls(); } let iam_client = Arc::new(IamClient::connect(iam_config).await?); let channel = Endpoint::from_shared(endpoint.clone())? .tcp_nodelay(true) .initial_stream_window_size(OBJECT_GRPC_INITIAL_STREAM_WINDOW) .initial_connection_window_size(OBJECT_GRPC_INITIAL_CONNECTION_WINDOW) .http2_keep_alive_interval(OBJECT_GRPC_KEEPALIVE_INTERVAL) .keep_alive_timeout(OBJECT_GRPC_KEEPALIVE_TIMEOUT) .connect_lazy(); let image_cache_dir = config.image_cache_dir.clone(); tokio::fs::create_dir_all(&image_cache_dir).await?; ensure_cache_dir_permissions(&image_cache_dir).await?; let http_client = HttpClient::builder() .connect_timeout(Duration::from_secs( config.image_import_connect_timeout_secs.max(1), )) .timeout(Duration::from_secs(config.image_import_timeout_secs.max(1))) .redirect(reqwest::redirect::Policy::none()) .build()?; let allowed_https_hosts = config .allowed_https_hosts .iter() .map(|host| host.trim().to_ascii_lowercase()) .filter(|host| !host.is_empty()) .collect::>(); let qemu_img_path = resolve_binary_path(config.qemu_img_path.as_deref(), "qemu-img")?; Ok(Some(Self { channel, iam_client, http_client, image_bucket: config.image_bucket.clone(), image_cache_dir, multipart_upload_concurrency: config .multipart_upload_concurrency .clamp(1, MAX_MULTIPART_UPLOAD_CONCURRENCY), multipart_upload_part_size: config.multipart_upload_part_size.clamp( MIN_MULTIPART_UPLOAD_PART_SIZE, MAX_MULTIPART_UPLOAD_PART_SIZE, ), raw_image_convert_parallelism: config.raw_image_convert_parallelism.clamp(1, 64), max_image_import_size_bytes: config.max_image_import_size_bytes.max(1), allowed_https_hosts: Arc::new(allowed_https_hosts), qemu_img_path, project_tokens: Arc::new(DashMap::new()), ensured_buckets: Arc::new(DashSet::new()), })) } pub(crate) async fn import_image( &self, org_id: &str, project_id: &str, image_id: &str, source_url: &str, source_format: ImageFormat, ) -> Result { let token = self.issue_project_token(org_id, project_id).await?; self.ensure_bucket(&self.image_bucket, org_id, project_id, &token) .await?; let staging_path = self.staging_path(image_id)?; let ImportedImageSource { source_type, host } = self.materialize_source(source_url, &staging_path).await?; self.process_staged_image( org_id, project_id, image_id, &token, &staging_path, source_format, source_type, host, ) .await } pub async fn materialize_image_cache( &self, org_id: &str, project_id: &str, image_id: &str, ) -> Result { let token = self.issue_project_token(org_id, project_id).await?; self.ensure_bucket(&self.image_bucket, org_id, project_id, &token) .await?; let image_key = image_object_key(org_id, project_id, image_id)?; let image_path = self.image_path(image_id)?; self.download_object_to_file(&self.image_bucket, &image_key, &image_path, &token) .await?; Ok(image_path) } pub async fn materialize_raw_image_cache( &self, org_id: &str, project_id: &str, image_id: &str, ) -> Result { let image_path = self .materialize_image_cache(org_id, project_id, image_id) .await?; let raw_path = self.raw_image_path(image_id)?; self.convert_to_raw(&image_path, &raw_path).await?; Ok(raw_path) } pub(crate) async fn delete_image( &self, org_id: &str, project_id: &str, image_id: &str, ) -> Result<(), Status> { let token = self.issue_project_token(org_id, project_id).await?; let image_key = image_object_key(org_id, project_id, image_id)?; let mut client = self.object_client().await?; let mut request = Request::new(DeleteObjectRequest { bucket: self.image_bucket.clone(), key: image_key, version_id: String::new(), }); attach_bearer(&mut request, &token)?; match client.delete_object(request).await { Ok(_) => {} Err(status) if status.code() == Code::NotFound => {} Err(status) => return Err(Status::from_error(Box::new(status))), } let image_path = self.image_path(image_id)?; if tokio::fs::try_exists(&image_path).await.map_err(|e| { Status::internal(format!("failed to inspect {}: {e}", image_path.display())) })? { tokio::fs::remove_file(&image_path).await.map_err(|e| { Status::internal(format!("failed to remove {}: {e}", image_path.display())) })?; } let raw_path = self.raw_image_path(image_id)?; if tokio::fs::try_exists(&raw_path).await.map_err(|e| { Status::internal(format!("failed to inspect {}: {e}", raw_path.display())) })? { tokio::fs::remove_file(&raw_path).await.map_err(|e| { Status::internal(format!("failed to remove {}: {e}", raw_path.display())) })?; } Ok(()) } pub fn minimum_upload_part_size(&self) -> u32 { self.multipart_upload_part_size as u32 } pub(crate) async fn begin_image_upload( &self, org_id: &str, project_id: &str, image_id: &str, ) -> Result<(String, String), Status> { let token = self.issue_project_token(org_id, project_id).await?; self.ensure_bucket(&self.image_bucket, org_id, project_id, &token) .await?; let staging_key = staging_object_key(org_id, project_id, image_id)?; let mut client = self.object_client().await?; let mut request = Request::new(CreateMultipartUploadRequest { bucket: self.image_bucket.clone(), key: staging_key.clone(), metadata: None, }); attach_bearer(&mut request, &token)?; let upload_id = client .create_multipart_upload(request) .await .map_err(|status| Status::from_error(Box::new(status)))? .into_inner() .upload_id; Ok((upload_id, staging_key)) } pub(crate) async fn upload_image_part( &self, org_id: &str, project_id: &str, staging_key: &str, upload_id: &str, part_number: u32, body: Vec, ) -> Result { if part_number == 0 { return Err(Status::invalid_argument( "part_number must be greater than zero", )); } if body.is_empty() { return Err(Status::invalid_argument( "upload part body must not be empty", )); } let token = self.issue_project_token(org_id, project_id).await?; let mut client = self.object_client().await?; let request_stream = tokio_stream::iter(vec![UploadPartRequest { bucket: self.image_bucket.clone(), key: staging_key.to_string(), upload_id: upload_id.to_string(), part_number, body: body.clone().into(), content_md5: String::new(), }]); let mut request = Request::new(request_stream); attach_bearer(&mut request, &token)?; let response = client .upload_part(request) .await .map_err(|status| Status::from_error(Box::new(status)))?; Ok(ImageUploadPart { part_number, etag: response.into_inner().etag, size_bytes: body.len() as u64, }) } pub(crate) async fn complete_image_upload( &self, org_id: &str, project_id: &str, image_id: &str, staging_key: &str, upload_id: &str, parts: &[ImageUploadPart], source_format: ImageFormat, ) -> Result { if parts.is_empty() { return Err(Status::failed_precondition( "upload session does not contain any parts", )); } let token = self.issue_project_token(org_id, project_id).await?; self.ensure_bucket(&self.image_bucket, org_id, project_id, &token) .await?; let mut sorted_parts: Vec = parts .iter() .map(|part| CompletedPart { part_number: part.part_number, etag: part.etag.clone(), }) .collect(); sorted_parts.sort_by_key(|part| part.part_number); let mut client = self.object_client().await?; let mut complete_request = Request::new(CompleteMultipartUploadRequest { bucket: self.image_bucket.clone(), key: staging_key.to_string(), upload_id: upload_id.to_string(), parts: sorted_parts, }); attach_bearer(&mut complete_request, &token)?; client .complete_multipart_upload(complete_request) .await .map_err(|status| Status::from_error(Box::new(status)))?; let staging_path = self.staging_path(image_id)?; if tokio::fs::try_exists(&staging_path).await.unwrap_or(false) { let _ = tokio::fs::remove_file(&staging_path).await; } self.download_object_to_file(&self.image_bucket, staging_key, &staging_path, &token) .await?; let result = self .process_staged_image( org_id, project_id, image_id, &token, &staging_path, source_format, "upload".to_string(), None, ) .await; let _ = self .delete_object_ignore_not_found(&self.image_bucket, staging_key, &token) .await; result } pub(crate) async fn abort_image_upload( &self, org_id: &str, project_id: &str, staging_key: &str, upload_id: &str, ) -> Result<(), Status> { let token = self.issue_project_token(org_id, project_id).await?; self.abort_multipart_upload(&self.image_bucket, staging_key, upload_id, &token) .await } async fn ensure_bucket( &self, bucket: &str, org_id: &str, project_id: &str, token: &str, ) -> Result<(), Status> { let bucket_key = format!("{org_id}/{project_id}/{bucket}"); if self.ensured_buckets.contains(&bucket_key) { return Ok(()); } let mut client = self.bucket_client().await?; let mut request = Request::new(CreateBucketRequest { bucket: bucket.to_string(), region: "default".to_string(), org_id: org_id.to_string(), project_id: project_id.to_string(), }); attach_bearer(&mut request, token)?; match client.create_bucket(request).await { Ok(_) => { self.ensured_buckets.insert(bucket_key); Ok(()) } Err(status) if status.code() == Code::AlreadyExists => { self.ensured_buckets.insert(bucket_key); Ok(()) } Err(status) => Err(Status::from_error(Box::new(status))), } } async fn upload_file( &self, bucket: &str, key: &str, path: &Path, token: &str, ) -> Result<(), Status> { let metadata = tokio::fs::metadata(path) .await .map_err(|e| Status::internal(format!("failed to stat {path:?}: {e}")))?; let multipart_part_size = self.multipart_upload_part_size; if metadata.len() > multipart_part_size as u64 { return self .upload_file_multipart(bucket, key, path, token, metadata.len()) .await; } self.upload_file_direct(bucket, key, path, token).await } async fn upload_file_direct( &self, bucket: &str, key: &str, path: &Path, token: &str, ) -> Result<(), Status> { let started = Instant::now(); let body = tokio::fs::read(path) .await .map_err(|e| Status::internal(format!("failed to read {path:?}: {e}")))?; tracing::info!( bucket = bucket, key = key, path = %path.display(), bytes = body.len(), "Uploading artifact object" ); let mut client = self.object_client().await?; let mut request = Request::new(PutObjectRequest { bucket: bucket.to_string(), key: key.to_string(), body: body.into(), metadata: None, content_md5: String::new(), if_none_match: String::new(), }); attach_bearer(&mut request, token)?; client .put_object(request) .await .map_err(|status| Status::from_error(Box::new(status)))?; tracing::info!( bucket = bucket, key = key, path = %path.display(), elapsed_ms = started.elapsed().as_millis() as u64, "Finished uploading artifact object" ); Ok(()) } async fn upload_file_multipart( &self, bucket: &str, key: &str, path: &Path, token: &str, size_bytes: u64, ) -> Result<(), Status> { let started = Instant::now(); let multipart_part_size = self.multipart_upload_part_size; tracing::info!( bucket = bucket, key = key, path = %path.display(), size_bytes, part_size = multipart_part_size, "Uploading artifact object with multipart upload" ); let mut client = self.object_client().await?; let mut create_request = Request::new(CreateMultipartUploadRequest { bucket: bucket.to_string(), key: key.to_string(), metadata: None, }); attach_bearer(&mut create_request, token)?; let upload_id = client .create_multipart_upload(create_request) .await .map_err(|status| Status::from_error(Box::new(status)))? .into_inner() .upload_id; let mut file = tokio::fs::File::open(path) .await .map_err(|e| Status::internal(format!("failed to open {path:?}: {e}")))?; let mut part_number = 1u32; let mut completed_parts = Vec::new(); let mut uploads = JoinSet::new(); let upload_concurrency = self.multipart_upload_concurrency; let enqueue_part_upload = |uploads: &mut JoinSet>, client: &ObjectServiceClient, part_number: u32, chunk: Vec| { let mut client = client.clone(); let bucket = bucket.to_string(); let key = key.to_string(); let upload_id = upload_id.clone(); let token = token.to_string(); uploads.spawn(async move { let request_stream = tokio_stream::iter(vec![UploadPartRequest { bucket, key, upload_id, part_number, body: chunk.into(), content_md5: String::new(), }]); let mut request = Request::new(request_stream); attach_bearer(&mut request, &token)?; let response = client .upload_part(request) .await .map_err(|status| Status::from_error(Box::new(status)))?; Ok(CompletedPart { part_number, etag: response.into_inner().etag, }) }); }; loop { let mut chunk = vec![0u8; multipart_part_size]; let mut bytes_read = 0usize; while bytes_read < chunk.len() { let read_now = file .read(&mut chunk[bytes_read..]) .await .map_err(|e| Status::internal(format!("failed to read {path:?}: {e}")))?; if read_now == 0 { break; } bytes_read += read_now; } if bytes_read == 0 { break; } chunk.truncate(bytes_read); enqueue_part_upload(&mut uploads, &client, part_number, chunk); part_number += 1; if uploads.len() >= upload_concurrency { match next_uploaded_part(&mut uploads).await { Ok(part) => completed_parts.push(part), Err(status) => { uploads.abort_all(); while uploads.join_next().await.is_some() {} let _ = self .abort_multipart_upload(bucket, key, &upload_id, token) .await; return Err(status); } } } } while !uploads.is_empty() { match next_uploaded_part(&mut uploads).await { Ok(part) => completed_parts.push(part), Err(status) => { uploads.abort_all(); while uploads.join_next().await.is_some() {} let _ = self .abort_multipart_upload(bucket, key, &upload_id, token) .await; return Err(status); } } } if completed_parts.is_empty() { let _ = self .abort_multipart_upload(bucket, key, &upload_id, token) .await; return self.upload_file_direct(bucket, key, path, token).await; } completed_parts.sort_by_key(|part| part.part_number); let mut complete_request = Request::new(CompleteMultipartUploadRequest { bucket: bucket.to_string(), key: key.to_string(), upload_id: upload_id.clone(), parts: completed_parts, }); attach_bearer(&mut complete_request, token)?; if let Err(status) = client.complete_multipart_upload(complete_request).await { let _ = self .abort_multipart_upload(bucket, key, &upload_id, token) .await; return Err(Status::from_error(Box::new(status))); } tracing::info!( bucket = bucket, key = key, path = %path.display(), size_bytes, part_count = part_number - 1, elapsed_ms = started.elapsed().as_millis() as u64, "Finished multipart artifact upload" ); Ok(()) } async fn download_object_to_file( &self, bucket: &str, key: &str, path: &Path, token: &str, ) -> Result<(), Status> { if tokio::fs::try_exists(path) .await .map_err(|e| Status::internal(format!("failed to inspect cache path {path:?}: {e}")))? { let size_bytes = cached_file_size(path).await?; tracing::info!( bucket = bucket, key = key, path = %path.display(), size_bytes, "Artifact cache hit" ); return Ok(()); } if let Some(parent) = path.parent() { tokio::fs::create_dir_all(parent) .await .map_err(|e| Status::internal(format!("failed to create {parent:?}: {e}")))?; } let started = Instant::now(); tracing::info!( bucket = bucket, key = key, path = %path.display(), "Downloading artifact object into local cache" ); let mut client = self.object_client().await?; let mut request = Request::new(GetObjectRequest { bucket: bucket.to_string(), key: key.to_string(), version_id: String::new(), range_start: -1, range_end: -1, if_match: String::new(), if_none_match: String::new(), if_modified_since: None, if_unmodified_since: None, }); attach_bearer(&mut request, token)?; let mut stream = client .get_object(request) .await .map_err(|status| Status::from_error(Box::new(status)))? .into_inner(); let temp_path = path.with_extension("download"); let mut file = tokio::fs::File::create(&temp_path) .await .map_err(|e| Status::internal(format!("failed to create {temp_path:?}: {e}")))?; let mut bytes_written: u64 = 0; while let Some(item) = stream.next().await { let response = item.map_err(|status| Status::from_error(Box::new(status)))?; if let Some(get_object_response::Content::BodyChunk(chunk)) = response.content { bytes_written += chunk.len() as u64; tokio::io::AsyncWriteExt::write_all(&mut file, &chunk) .await .map_err(|e| Status::internal(format!("failed to write {temp_path:?}: {e}")))?; } } tokio::io::AsyncWriteExt::flush(&mut file) .await .map_err(|e| Status::internal(format!("failed to flush {temp_path:?}: {e}")))?; drop(file); tokio::fs::rename(&temp_path, path) .await .map_err(|e| Status::internal(format!("failed to move object into cache: {e}")))?; ensure_cache_file_permissions(path).await?; tracing::info!( bucket = bucket, key = key, path = %path.display(), bytes_written, elapsed_ms = started.elapsed().as_millis() as u64, "Finished downloading artifact object" ); Ok(()) } async fn materialize_source( &self, source_url: &str, path: &Path, ) -> Result { if let Some(parent) = path.parent() { tokio::fs::create_dir_all(parent) .await .map_err(|e| Status::internal(format!("failed to create {parent:?}: {e}")))?; } if let Some(local_path) = local_source_path(source_url)? { self.copy_local_source(&local_path, path).await?; return Ok(ImportedImageSource { source_type: "file".to_string(), host: None, }); } let validated = self.validate_import_url(source_url).await?; self.download_https_source(&validated, path).await?; Ok(ImportedImageSource { source_type: "https".to_string(), host: Some(validated.host), }) } async fn copy_local_source(&self, source_path: &Path, path: &Path) -> Result<(), Status> { let source = tokio::fs::canonicalize(source_path).await.map_err(|e| { Status::invalid_argument(format!( "failed to access local source_url path {}: {e}", source_path.display() )) })?; let metadata = tokio::fs::metadata(&source).await.map_err(|e| { Status::invalid_argument(format!( "failed to stat local source_url path {}: {e}", source.display() )) })?; if !metadata.is_file() { return Err(Status::invalid_argument(format!( "local source_url path {} is not a regular file", source.display() ))); } if metadata.len() > self.max_image_import_size_bytes { return Err(Status::resource_exhausted(format!( "local source_url exceeds the configured maximum size of {} bytes", self.max_image_import_size_bytes ))); } let temp_path = path.with_extension("local"); if tokio::fs::try_exists(&temp_path).await.unwrap_or(false) { let _ = tokio::fs::remove_file(&temp_path).await; } tokio::fs::copy(&source, &temp_path).await.map_err(|e| { Status::internal(format!( "failed to copy local source_url {} into {}: {e}", source.display(), temp_path.display() )) })?; tokio::fs::rename(&temp_path, path).await.map_err(|e| { Status::internal(format!( "failed to finalize local source_url copy into {}: {e}", path.display() )) })?; ensure_cache_file_permissions(path).await?; Ok(()) } async fn convert_to_qcow2(&self, source: &Path, destination: &Path) -> Result<(), Status> { if tokio::fs::try_exists(destination) .await .map_err(|e| Status::internal(format!("failed to inspect {destination:?}: {e}")))? { return Ok(()); } if let Some(parent) = destination.parent() { tokio::fs::create_dir_all(parent) .await .map_err(|e| Status::internal(format!("failed to create {parent:?}: {e}")))?; } let parallelism = self.raw_image_convert_parallelism.to_string(); let args = qemu_img_convert_to_qcow2_args(source, destination, ¶llelism); let status = Command::new(&self.qemu_img_path) .args(args.iter().map(String::as_str)) .status() .await .map_err(|e| Status::internal(format!("failed to spawn qemu-img convert: {e}")))?; if status.success() { ensure_cache_file_permissions(destination).await?; Ok(()) } else { Err(Status::internal(format!( "qemu-img convert failed for {} with status {status}", source.display() ))) } } async fn convert_to_raw(&self, source: &Path, destination: &Path) -> Result<(), Status> { if tokio::fs::try_exists(destination) .await .map_err(|e| Status::internal(format!("failed to inspect {destination:?}: {e}")))? { return Ok(()); } if let Some(parent) = destination.parent() { tokio::fs::create_dir_all(parent) .await .map_err(|e| Status::internal(format!("failed to create {parent:?}: {e}")))?; } let parallelism = self.raw_image_convert_parallelism.to_string(); let args = qemu_img_convert_to_raw_args(source, destination, ¶llelism); let status = Command::new(&self.qemu_img_path) .args(args.iter().map(String::as_str)) .status() .await .map_err(|e| Status::internal(format!("failed to spawn qemu-img convert: {e}")))?; if status.success() { ensure_cache_file_permissions(destination).await?; Ok(()) } else { Err(Status::internal(format!( "qemu-img convert to raw failed for {} with status {status}", source.display() ))) } } async fn can_reuse_qcow2_source( &self, path: &Path, source_format: ImageFormat, ) -> Result { if source_format != ImageFormat::Qcow2 { return Ok(false); } let output = Command::new(&self.qemu_img_path) .args(["info", "--output", "json", path.to_string_lossy().as_ref()]) .output() .await .map_err(|e| Status::internal(format!("failed to spawn qemu-img info: {e}")))?; if !output.status.success() { return Ok(false); } let info: QemuImageInfo = serde_json::from_slice(&output.stdout) .map_err(|e| Status::internal(format!("failed to parse qemu-img info output: {e}")))?; Ok(info.format == "qcow2") } async fn sha256sum(&self, path: &Path) -> Result { let mut file = tokio::fs::File::open(path) .await .map_err(|e| Status::internal(format!("failed to open {}: {e}", path.display())))?; let mut digest = Sha256::new(); let mut buffer = vec![0u8; 1024 * 1024]; loop { let read_now = file .read(&mut buffer) .await .map_err(|e| Status::internal(format!("failed to read {}: {e}", path.display())))?; if read_now == 0 { break; } digest.update(&buffer[..read_now]); } Ok(format!("{:x}", digest.finalize())) } async fn process_staged_image( &self, org_id: &str, project_id: &str, image_id: &str, token: &str, staging_path: &Path, source_format: ImageFormat, source_type: String, source_host: Option, ) -> Result { let image_path = self.image_path(image_id)?; if self .can_reuse_qcow2_source(staging_path, source_format) .await? { if tokio::fs::try_exists(&image_path).await.map_err(|e| { Status::internal(format!("failed to inspect {}: {e}", image_path.display())) })? { let _ = tokio::fs::remove_file(staging_path).await; } else { tokio::fs::rename(staging_path, &image_path) .await .map_err(|e| { Status::internal(format!( "failed to move qcow2 image {} into cache {}: {e}", staging_path.display(), image_path.display() )) })?; ensure_cache_file_permissions(&image_path).await?; } } else { self.convert_to_qcow2(staging_path, &image_path).await?; let _ = tokio::fs::remove_file(staging_path).await; } let checksum = self.sha256sum(&image_path).await?; let metadata = tokio::fs::metadata(&image_path).await.map_err(|e| { Status::internal(format!("failed to stat {}: {e}", image_path.display())) })?; let image_key = image_object_key(org_id, project_id, image_id)?; self.upload_file(&self.image_bucket, &image_key, &image_path, token) .await?; Ok(ImportedImage { size_bytes: metadata.len(), checksum, format: ImageFormat::Qcow2, source_type, source_host, }) } async fn validate_import_url(&self, source_url: &str) -> Result { let url = Url::parse(source_url) .map_err(|e| Status::invalid_argument(format!("invalid source_url: {e}")))?; if url.scheme() != "https" { return Err(Status::invalid_argument("source_url must use https://")); } if !url.username().is_empty() || url.password().is_some() { return Err(Status::invalid_argument( "source_url must not include embedded credentials", )); } let host = url .host_str() .map(str::to_ascii_lowercase) .ok_or_else(|| Status::invalid_argument("source_url must include a host"))?; if host == "localhost" { return Err(Status::invalid_argument( "source_url host must not target loopback or local interfaces", )); } if !self.allowed_https_hosts.is_empty() && !self.allowed_https_hosts.contains(&host) { return Err(Status::permission_denied(format!( "source_url host {host} is not in artifacts.allowed_https_hosts", ))); } let port = url.port_or_known_default().unwrap_or(443); let resolved = tokio::net::lookup_host((host.as_str(), port)) .await .map_err(|e| Status::unavailable(format!("failed to resolve source_url host: {e}")))?; let mut found_any = false; for addr in resolved { found_any = true; let ip = addr.ip(); if !is_public_ip(ip) { return Err(Status::permission_denied(format!( "source_url resolved to a non-public address: {ip}", ))); } } if !found_any { return Err(Status::failed_precondition( "source_url host did not resolve to any public addresses", )); } Ok(ValidatedImportUrl { url, host }) } async fn download_https_source( &self, source: &ValidatedImportUrl, path: &Path, ) -> Result<(), Status> { let temp_path = path.with_extension("download"); if tokio::fs::try_exists(&temp_path).await.unwrap_or(false) { let _ = tokio::fs::remove_file(&temp_path).await; } let mut current = source.url.clone(); let mut redirects_remaining = MAX_IMPORT_REDIRECTS; loop { let response = tokio::time::timeout( DEFAULT_HTTP_SEND_TIMEOUT, self.http_client.get(current.clone()).send(), ) .await .map_err(|_| Status::deadline_exceeded("timed out waiting for source_url response"))? .map_err(|e| Status::unavailable(format!("failed to download image source: {e}")))?; if response.status().is_redirection() { if redirects_remaining == 0 { return Err(Status::failed_precondition( "source_url redirect limit exceeded", )); } let location = response .headers() .get(LOCATION) .ok_or_else(|| { Status::failed_precondition( "source_url redirect response did not include a Location header", ) })? .to_str() .map_err(|e| { Status::failed_precondition(format!( "invalid redirect Location header in source_url response: {e}" )) })?; current = current.join(location).map_err(|e| { Status::failed_precondition(format!( "failed to resolve redirect target from source_url: {e}" )) })?; self.validate_import_url(current.as_ref()).await?; redirects_remaining -= 1; continue; } if !response.status().is_success() { return Err(Status::failed_precondition(format!( "image download failed with HTTP {}", response.status() ))); } if let Some(content_length) = response.content_length() { if content_length > self.max_image_import_size_bytes { return Err(Status::resource_exhausted(format!( "image download exceeds the configured maximum size of {} bytes", self.max_image_import_size_bytes ))); } } let mut file = tokio::fs::File::create(&temp_path).await.map_err(|e| { Status::internal(format!( "failed to create downloaded image {}: {e}", temp_path.display() )) })?; let mut response = response; let mut downloaded = 0u64; while let Some(chunk) = response.chunk().await.map_err(|e| { Status::unavailable(format!("failed to read image response body: {e}")) })? { downloaded = downloaded.saturating_add(chunk.len() as u64); if downloaded > self.max_image_import_size_bytes { let _ = tokio::fs::remove_file(&temp_path).await; return Err(Status::resource_exhausted(format!( "image download exceeds the configured maximum size of {} bytes", self.max_image_import_size_bytes ))); } file.write_all(&chunk).await.map_err(|e| { Status::internal(format!( "failed to write downloaded image {}: {e}", temp_path.display() )) })?; } file.flush().await.map_err(|e| { Status::internal(format!( "failed to flush downloaded image {}: {e}", temp_path.display() )) })?; drop(file); tokio::fs::rename(&temp_path, path).await.map_err(|e| { Status::internal(format!( "failed to finalize downloaded image {}: {e}", path.display() )) })?; ensure_cache_file_permissions(path).await?; return Ok(()); } } async fn delete_object_ignore_not_found( &self, bucket: &str, key: &str, token: &str, ) -> Result<(), Status> { let mut client = self.object_client().await?; let mut request = Request::new(DeleteObjectRequest { bucket: bucket.to_string(), key: key.to_string(), version_id: String::new(), }); attach_bearer(&mut request, token)?; match client.delete_object(request).await { Ok(_) => Ok(()), Err(status) if status.code() == Code::NotFound => Ok(()), Err(status) => Err(Status::from_error(Box::new(status))), } } async fn bucket_client(&self) -> Result, Status> { Ok(BucketServiceClient::new(self.channel.clone()) .max_decoding_message_size(MAX_OBJECT_GRPC_MESSAGE_SIZE) .max_encoding_message_size(MAX_OBJECT_GRPC_MESSAGE_SIZE)) } async fn object_client(&self) -> Result, Status> { Ok(ObjectServiceClient::new(self.channel.clone()) .max_decoding_message_size(MAX_OBJECT_GRPC_MESSAGE_SIZE) .max_encoding_message_size(MAX_OBJECT_GRPC_MESSAGE_SIZE)) } async fn issue_project_token(&self, org_id: &str, project_id: &str) -> Result { let cache_key = format!("{org_id}/{project_id}"); if let Some(cached) = self.project_tokens.get(&cache_key) { if cached.expires_at > Instant::now() + Duration::from_secs(60) { return Ok(cached.token.clone()); } } let principal_id = format!( "plasmavmc-{}-{}", sanitize_identifier(org_id), sanitize_identifier(project_id) ); let principal_ref = PrincipalRef::service_account(&principal_id); let principal = match self .iam_client .get_principal(&principal_ref) .await .map_err(|e| Status::unavailable(format!("failed to fetch service account: {e}")))? { Some(principal) => principal, None => self .iam_client .create_service_account(&principal_id, &principal_id, org_id, project_id) .await .map_err(|e| { Status::unavailable(format!("failed to create service account: {e}")) })?, }; let existing_bindings = self .iam_client .list_bindings_for_principal(&principal_ref) .await .map_err(|e| Status::unavailable(format!("failed to list IAM bindings: {e}")))?; let scope = Scope::project(project_id, org_id); let has_binding = existing_bindings .iter() .any(|binding| binding.role_ref == "roles/ProjectAdmin" && binding.scope == scope); if !has_binding { let binding = PolicyBinding::new( format!("binding-{principal_id}-{project_id}"), principal_ref, "roles/ProjectAdmin", scope.clone(), ); self.iam_client .create_binding(&binding) .await .map_err(|e| Status::unavailable(format!("failed to create IAM binding: {e}")))?; } self.wait_for_project_admin_access(&principal, org_id, project_id) .await?; let token = self .iam_client .issue_token(&principal, vec![], scope, 3600) .await .map_err(|e| Status::unavailable(format!("failed to issue IAM token: {e}")))?; self.project_tokens.insert( cache_key, CachedToken { token: token.clone(), expires_at: Instant::now() + Duration::from_secs(55 * 60), }, ); Ok(token) } fn image_path(&self, image_id: &str) -> Result { Ok(self .image_cache_dir .join(format!("{}.qcow2", validated_image_id(image_id)?))) } fn raw_image_path(&self, image_id: &str) -> Result { Ok(self .image_cache_dir .join(format!("{}.raw", validated_image_id(image_id)?))) } fn staging_path(&self, image_id: &str) -> Result { Ok(self .image_cache_dir .join(format!("{}.source", validated_image_id(image_id)?))) } async fn abort_multipart_upload( &self, bucket: &str, key: &str, upload_id: &str, token: &str, ) -> Result<(), Status> { let mut client = self.object_client().await?; let mut request = Request::new(AbortMultipartUploadRequest { bucket: bucket.to_string(), key: key.to_string(), upload_id: upload_id.to_string(), }); attach_bearer(&mut request, token)?; client .abort_multipart_upload(request) .await .map_err(|status| Status::from_error(Box::new(status)))?; Ok(()) } async fn wait_for_project_admin_access( &self, principal: &iam_types::Principal, org_id: &str, project_id: &str, ) -> Result<(), Status> { let deadline = Instant::now() + Duration::from_secs(30); let resource = Resource::new("bucket", "artifact-bootstrap", org_id, project_id); let mut last_error: String; loop { last_error = match self .iam_client .authorize(principal, "storage:buckets:create", &resource) .await { Ok(true) => return Ok(()), Ok(false) => "binding not yet effective".to_string(), Err(error) => error.to_string(), }; if Instant::now() >= deadline { return Err(Status::failed_precondition(format!( "timed out waiting for IAM ProjectAdmin access for {} in {}/{}: {}", principal.id, org_id, project_id, last_error ))); } tokio::time::sleep(Duration::from_secs(1)).await; } } } async fn cached_file_size(path: &Path) -> Result { tokio::fs::metadata(path) .await .map(|metadata| metadata.len()) .map_err(|e| Status::internal(format!("failed to stat {}: {e}", path.display()))) } async fn next_uploaded_part( uploads: &mut JoinSet>, ) -> Result { match uploads.join_next().await { Some(Ok(result)) => result, Some(Err(join_error)) => Err(Status::internal(format!( "multipart upload task failed: {join_error}" ))), None => Err(Status::internal( "multipart upload queue drained unexpectedly", )), } } fn qemu_img_convert_to_qcow2_args( source: &Path, destination: &Path, parallelism: &str, ) -> Vec { vec![ "convert".to_string(), "-t".to_string(), "none".to_string(), "-T".to_string(), "none".to_string(), "-m".to_string(), parallelism.to_string(), "-c".to_string(), "-O".to_string(), "qcow2".to_string(), source.to_string_lossy().into_owned(), destination.to_string_lossy().into_owned(), ] } fn qemu_img_convert_to_raw_args( source: &Path, destination: &Path, parallelism: &str, ) -> Vec { vec![ "convert".to_string(), "-t".to_string(), "none".to_string(), "-T".to_string(), "none".to_string(), "-m".to_string(), parallelism.to_string(), "-W".to_string(), "-O".to_string(), "raw".to_string(), source.to_string_lossy().into_owned(), destination.to_string_lossy().into_owned(), ] } fn attach_bearer(request: &mut Request, token: &str) -> Result<(), Status> { let value = MetadataValue::try_from(format!("Bearer {token}")) .map_err(|_| Status::internal("invalid bearer token"))?; request.metadata_mut().insert("authorization", value); Ok(()) } fn normalize_endpoint(endpoint: &str) -> String { if endpoint.starts_with("http://") || endpoint.starts_with("https://") { endpoint.to_string() } else { format!("http://{endpoint}") } } fn sanitize_identifier(value: &str) -> String { value .chars() .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '-' }) .collect() } fn image_object_key(org_id: &str, project_id: &str, image_id: &str) -> Result { Ok(format!( "{org_id}/{project_id}/{}.qcow2", validated_image_id(image_id)? )) } fn staging_object_key(org_id: &str, project_id: &str, image_id: &str) -> Result { Ok(format!( "{org_id}/{project_id}/uploads/{}.source", validated_image_id(image_id)? )) } fn validated_image_id(image_id: &str) -> Result<&str, Status> { uuid::Uuid::parse_str(image_id) .map(|_| image_id) .map_err(|_| Status::invalid_argument("image_id must be a UUID")) } fn is_public_ip(ip: IpAddr) -> bool { match ip { IpAddr::V4(ip) => { !(ip.is_private() || ip.is_loopback() || ip.is_link_local() || ip.is_multicast() || ip.is_broadcast() || ip.is_documentation() || ip.is_unspecified()) } IpAddr::V6(ip) => { !(ip.is_loopback() || ip.is_unspecified() || ip.is_multicast() || ip.is_unique_local() || ip.is_unicast_link_local()) } } } fn resolve_binary_path( configured_path: Option<&Path>, binary_name: &str, ) -> Result> { let candidate = match configured_path { Some(path) => path.to_path_buf(), None => std::env::var_os("PATH") .into_iter() .flat_map(|paths| std::env::split_paths(&paths).collect::>()) .map(|entry| entry.join(binary_name)) .find(|candidate| candidate.exists()) .ok_or_else(|| { std::io::Error::new( std::io::ErrorKind::NotFound, format!("failed to locate {binary_name} in PATH"), ) })?, }; let metadata = std::fs::metadata(&candidate)?; if !metadata.is_file() { return Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, format!("{} is not a regular file", candidate.display()), ) .into()); } Ok(candidate) } fn local_source_path(source_url: &str) -> Result, Status> { if source_url.starts_with("file://") { let url = Url::parse(source_url) .map_err(|e| Status::invalid_argument(format!("invalid source_url: {e}")))?; let path = url .to_file_path() .map_err(|_| Status::invalid_argument("source_url file:// path must be absolute"))?; return Ok(Some(path)); } if source_url.starts_with('/') { return Ok(Some(PathBuf::from(source_url))); } Ok(None) } async fn ensure_cache_dir_permissions(path: &Path) -> Result<(), Status> { #[cfg(unix)] { use std::os::unix::fs::PermissionsExt; let permissions = std::fs::Permissions::from_mode(0o2770); tokio::fs::set_permissions(path, permissions) .await .map_err(|e| { Status::internal(format!( "failed to set image cache directory permissions on {}: {e}", path.display() )) })?; } Ok(()) } async fn ensure_cache_file_permissions(path: &Path) -> Result<(), Status> { #[cfg(unix)] { use std::os::unix::fs::PermissionsExt; let permissions = std::fs::Permissions::from_mode(0o640); tokio::fs::set_permissions(path, permissions) .await .map_err(|e| { Status::internal(format!( "failed to set image cache file permissions on {}: {e}", path.display() )) })?; } Ok(()) } #[cfg(test)] mod tests { use super::*; #[test] fn qemu_img_convert_to_qcow2_args_include_parallel_direct_io() { let args = qemu_img_convert_to_qcow2_args( Path::new("/tmp/source.raw"), Path::new("/tmp/image.qcow2"), "8", ); assert_eq!( args, vec![ "convert", "-t", "none", "-T", "none", "-m", "8", "-c", "-O", "qcow2", "/tmp/source.raw", "/tmp/image.qcow2", ] ); } #[test] fn qemu_img_convert_to_raw_args_enable_fast_cache_rebuild() { let args = qemu_img_convert_to_raw_args( Path::new("/tmp/image.qcow2"), Path::new("/tmp/image.raw"), "8", ); assert_eq!( args, vec![ "convert", "-t", "none", "-T", "none", "-m", "8", "-W", "-O", "raw", "/tmp/image.qcow2", "/tmp/image.raw", ] ); } #[test] fn image_object_key_rejects_non_uuid_identifiers() { assert!(image_object_key("org", "project", "../passwd").is_err()); assert_eq!( image_object_key("org", "project", "11111111-1111-1111-1111-111111111111").unwrap(), "org/project/11111111-1111-1111-1111-111111111111.qcow2" ); } #[test] fn local_source_path_accepts_local_files_and_ignores_https_urls() { assert_eq!( local_source_path("file:///tmp/source.qcow2").unwrap(), Some(PathBuf::from("/tmp/source.qcow2")) ); assert_eq!( local_source_path("/var/lib/source.qcow2").unwrap(), Some(PathBuf::from("/var/lib/source.qcow2")) ); assert_eq!( local_source_path("https://example.com/source.qcow2").unwrap(), None ); } }