photoncloud-monorepo/plasmavmc/crates/plasmavmc-server/src/artifact_store.rs

973 lines
35 KiB
Rust

use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use dashmap::{DashMap, DashSet};
use iam_client::client::IamClientConfig;
use iam_client::IamClient;
use iam_types::{PolicyBinding, PrincipalRef, Resource, Scope};
use lightningstor_api::proto::{
get_object_response, AbortMultipartUploadRequest, CompleteMultipartUploadRequest,
CompletedPart, CreateBucketRequest, CreateMultipartUploadRequest, DeleteObjectRequest,
GetObjectRequest, PutObjectRequest, UploadPartRequest,
};
use lightningstor_api::{BucketServiceClient, ObjectServiceClient};
use plasmavmc_types::ImageFormat;
use reqwest::StatusCode as HttpStatusCode;
use serde::Deserialize;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::Command;
use tokio::task::JoinSet;
use tokio_stream::StreamExt;
use tonic::metadata::MetadataValue;
use tonic::transport::{Channel, Endpoint};
use tonic::{Code, Request, Status};
const DEFAULT_IMAGE_BUCKET: &str = "plasmavmc-images";
const MAX_OBJECT_GRPC_MESSAGE_SIZE: usize = 1024 * 1024 * 1024;
const OBJECT_GRPC_INITIAL_STREAM_WINDOW: u32 = 64 * 1024 * 1024;
const OBJECT_GRPC_INITIAL_CONNECTION_WINDOW: u32 = 512 * 1024 * 1024;
const OBJECT_GRPC_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
const OBJECT_GRPC_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_MULTIPART_UPLOAD_PART_SIZE: usize = 32 * 1024 * 1024;
const MIN_MULTIPART_UPLOAD_PART_SIZE: usize = 8 * 1024 * 1024;
const MAX_MULTIPART_UPLOAD_PART_SIZE: usize = 128 * 1024 * 1024;
const DEFAULT_MULTIPART_UPLOAD_CONCURRENCY: usize = 4;
const MAX_MULTIPART_UPLOAD_CONCURRENCY: usize = 32;
const DEFAULT_RAW_IMAGE_CONVERT_PARALLELISM: usize = 8;
#[derive(Clone)]
pub struct ArtifactStore {
channel: Channel,
iam_client: Arc<IamClient>,
image_bucket: String,
image_cache_dir: PathBuf,
project_tokens: Arc<DashMap<String, CachedToken>>,
ensured_buckets: Arc<DashSet<String>>,
}
pub(crate) struct ImportedImage {
pub size_bytes: u64,
pub checksum: String,
pub format: ImageFormat,
}
#[derive(Deserialize)]
struct QemuImageInfo {
format: String,
}
struct CachedToken {
token: String,
expires_at: Instant,
}
impl ArtifactStore {
pub async fn from_env(iam_endpoint: &str) -> Result<Option<Self>, Box<dyn std::error::Error>> {
let Some(raw_endpoint) = std::env::var("PLASMAVMC_LIGHTNINGSTOR_ENDPOINT")
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
else {
return Ok(None);
};
let endpoint = normalize_endpoint(&raw_endpoint);
let iam_endpoint = normalize_endpoint(iam_endpoint);
let mut iam_config = IamClientConfig::new(iam_endpoint.clone()).with_timeout(5000);
if iam_endpoint.starts_with("http://") {
iam_config = iam_config.without_tls();
}
let iam_client = Arc::new(IamClient::connect(iam_config).await?);
let channel = Endpoint::from_shared(endpoint.clone())?
.tcp_nodelay(true)
.initial_stream_window_size(OBJECT_GRPC_INITIAL_STREAM_WINDOW)
.initial_connection_window_size(OBJECT_GRPC_INITIAL_CONNECTION_WINDOW)
.http2_keep_alive_interval(OBJECT_GRPC_KEEPALIVE_INTERVAL)
.keep_alive_timeout(OBJECT_GRPC_KEEPALIVE_TIMEOUT)
.connect_lazy();
let image_cache_dir = std::env::var("PLASMAVMC_IMAGE_CACHE_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("/var/lib/plasmavmc/images"));
tokio::fs::create_dir_all(&image_cache_dir).await?;
Ok(Some(Self {
channel,
iam_client,
image_bucket: std::env::var("PLASMAVMC_IMAGE_BUCKET")
.unwrap_or_else(|_| DEFAULT_IMAGE_BUCKET.to_string()),
image_cache_dir,
project_tokens: Arc::new(DashMap::new()),
ensured_buckets: Arc::new(DashSet::new()),
}))
}
pub(crate) async fn import_image(
&self,
org_id: &str,
project_id: &str,
image_id: &str,
source_url: &str,
source_format: ImageFormat,
) -> Result<ImportedImage, Status> {
let token = self.issue_project_token(org_id, project_id).await?;
self.ensure_bucket(&self.image_bucket, org_id, project_id, &token)
.await?;
let image_path = self.image_path(image_id);
let staging_path = self.image_cache_dir.join(format!("{image_id}.source"));
self.materialize_source(source_url, &staging_path).await?;
if self
.can_reuse_qcow2_source(&staging_path, source_format)
.await?
{
if tokio::fs::try_exists(&image_path)
.await
.map_err(|e| Status::internal(format!("failed to inspect {}: {e}", image_path.display())))?
{
let _ = tokio::fs::remove_file(&staging_path).await;
} else {
tokio::fs::rename(&staging_path, &image_path)
.await
.map_err(|e| Status::internal(format!(
"failed to move qcow2 image {} into cache {}: {e}",
staging_path.display(),
image_path.display()
)))?;
}
} else {
// Normalize non-qcow2 inputs through qemu-img convert so the cached
// artifact has a stable qcow2 representation before upload.
self.convert_to_qcow2(&staging_path, &image_path).await?;
let _ = tokio::fs::remove_file(&staging_path).await;
}
let checksum = self.sha256sum(&image_path).await?;
let metadata = tokio::fs::metadata(&image_path).await.map_err(|e| {
Status::internal(format!("failed to stat {}: {e}", image_path.display()))
})?;
let image_key = image_object_key(org_id, project_id, image_id);
self.upload_file(&self.image_bucket, &image_key, &image_path, &token)
.await?;
Ok(ImportedImage {
size_bytes: metadata.len(),
checksum,
format: ImageFormat::Qcow2,
})
}
pub async fn materialize_image_cache(
&self,
org_id: &str,
project_id: &str,
image_id: &str,
) -> Result<PathBuf, Status> {
let token = self.issue_project_token(org_id, project_id).await?;
self.ensure_bucket(&self.image_bucket, org_id, project_id, &token)
.await?;
let image_key = image_object_key(org_id, project_id, image_id);
let image_path = self.image_path(image_id);
self.download_object_to_file(&self.image_bucket, &image_key, &image_path, &token)
.await?;
Ok(image_path)
}
pub async fn materialize_raw_image_cache(
&self,
org_id: &str,
project_id: &str,
image_id: &str,
) -> Result<PathBuf, Status> {
let image_path = self
.materialize_image_cache(org_id, project_id, image_id)
.await?;
let raw_path = self.raw_image_path(image_id);
self.convert_to_raw(&image_path, &raw_path).await?;
Ok(raw_path)
}
pub(crate) async fn delete_image(
&self,
org_id: &str,
project_id: &str,
image_id: &str,
) -> Result<(), Status> {
let token = self.issue_project_token(org_id, project_id).await?;
let image_key = image_object_key(org_id, project_id, image_id);
let mut client = self.object_client().await?;
let mut request = Request::new(DeleteObjectRequest {
bucket: self.image_bucket.clone(),
key: image_key,
version_id: String::new(),
});
attach_bearer(&mut request, &token)?;
match client.delete_object(request).await {
Ok(_) => {}
Err(status) if status.code() == Code::NotFound => {}
Err(status) => return Err(Status::from_error(Box::new(status))),
}
let image_path = self.image_path(image_id);
if tokio::fs::try_exists(&image_path).await.map_err(|e| {
Status::internal(format!("failed to inspect {}: {e}", image_path.display()))
})? {
tokio::fs::remove_file(&image_path).await.map_err(|e| {
Status::internal(format!("failed to remove {}: {e}", image_path.display()))
})?;
}
let raw_path = self.raw_image_path(image_id);
if tokio::fs::try_exists(&raw_path).await.map_err(|e| {
Status::internal(format!("failed to inspect {}: {e}", raw_path.display()))
})? {
tokio::fs::remove_file(&raw_path).await.map_err(|e| {
Status::internal(format!("failed to remove {}: {e}", raw_path.display()))
})?;
}
Ok(())
}
async fn ensure_bucket(
&self,
bucket: &str,
org_id: &str,
project_id: &str,
token: &str,
) -> Result<(), Status> {
let bucket_key = format!("{org_id}/{project_id}/{bucket}");
if self.ensured_buckets.contains(&bucket_key) {
return Ok(());
}
let mut client = self.bucket_client().await?;
let mut request = Request::new(CreateBucketRequest {
bucket: bucket.to_string(),
region: "default".to_string(),
org_id: org_id.to_string(),
project_id: project_id.to_string(),
});
attach_bearer(&mut request, token)?;
match client.create_bucket(request).await {
Ok(_) => {
self.ensured_buckets.insert(bucket_key);
Ok(())
}
Err(status) if status.code() == Code::AlreadyExists => {
self.ensured_buckets.insert(bucket_key);
Ok(())
}
Err(status) => Err(Status::from_error(Box::new(status))),
}
}
async fn upload_file(
&self,
bucket: &str,
key: &str,
path: &Path,
token: &str,
) -> Result<(), Status> {
let metadata = tokio::fs::metadata(path)
.await
.map_err(|e| Status::internal(format!("failed to stat {path:?}: {e}")))?;
let multipart_part_size = multipart_upload_part_size();
if metadata.len() > multipart_part_size as u64 {
return self
.upload_file_multipart(bucket, key, path, token, metadata.len())
.await;
}
self.upload_file_direct(bucket, key, path, token).await
}
async fn upload_file_direct(
&self,
bucket: &str,
key: &str,
path: &Path,
token: &str,
) -> Result<(), Status> {
let started = Instant::now();
let body = tokio::fs::read(path)
.await
.map_err(|e| Status::internal(format!("failed to read {path:?}: {e}")))?;
tracing::info!(
bucket = bucket,
key = key,
path = %path.display(),
bytes = body.len(),
"Uploading artifact object"
);
let mut client = self.object_client().await?;
let mut request = Request::new(PutObjectRequest {
bucket: bucket.to_string(),
key: key.to_string(),
body: body.into(),
metadata: None,
content_md5: String::new(),
if_none_match: String::new(),
});
attach_bearer(&mut request, token)?;
client
.put_object(request)
.await
.map_err(|status| Status::from_error(Box::new(status)))?;
tracing::info!(
bucket = bucket,
key = key,
path = %path.display(),
elapsed_ms = started.elapsed().as_millis() as u64,
"Finished uploading artifact object"
);
Ok(())
}
async fn upload_file_multipart(
&self,
bucket: &str,
key: &str,
path: &Path,
token: &str,
size_bytes: u64,
) -> Result<(), Status> {
let started = Instant::now();
let multipart_part_size = multipart_upload_part_size();
tracing::info!(
bucket = bucket,
key = key,
path = %path.display(),
size_bytes,
part_size = multipart_part_size,
"Uploading artifact object with multipart upload"
);
let mut client = self.object_client().await?;
let mut create_request = Request::new(CreateMultipartUploadRequest {
bucket: bucket.to_string(),
key: key.to_string(),
metadata: None,
});
attach_bearer(&mut create_request, token)?;
let upload_id = client
.create_multipart_upload(create_request)
.await
.map_err(|status| Status::from_error(Box::new(status)))?
.into_inner()
.upload_id;
let mut file = tokio::fs::File::open(path)
.await
.map_err(|e| Status::internal(format!("failed to open {path:?}: {e}")))?;
let mut part_number = 1u32;
let mut completed_parts = Vec::new();
let mut uploads = JoinSet::new();
let upload_concurrency = multipart_upload_concurrency();
let enqueue_part_upload = |uploads: &mut JoinSet<Result<CompletedPart, Status>>,
client: &ObjectServiceClient<Channel>,
part_number: u32,
chunk: Vec<u8>| {
let mut client = client.clone();
let bucket = bucket.to_string();
let key = key.to_string();
let upload_id = upload_id.clone();
let token = token.to_string();
uploads.spawn(async move {
let request_stream = tokio_stream::iter(vec![UploadPartRequest {
bucket,
key,
upload_id,
part_number,
body: chunk.into(),
content_md5: String::new(),
}]);
let mut request = Request::new(request_stream);
attach_bearer(&mut request, &token)?;
let response = client
.upload_part(request)
.await
.map_err(|status| Status::from_error(Box::new(status)))?;
Ok(CompletedPart {
part_number,
etag: response.into_inner().etag,
})
});
};
loop {
let mut chunk = vec![0u8; multipart_part_size];
let mut bytes_read = 0usize;
while bytes_read < chunk.len() {
let read_now = file
.read(&mut chunk[bytes_read..])
.await
.map_err(|e| Status::internal(format!("failed to read {path:?}: {e}")))?;
if read_now == 0 {
break;
}
bytes_read += read_now;
}
if bytes_read == 0 {
break;
}
chunk.truncate(bytes_read);
enqueue_part_upload(&mut uploads, &client, part_number, chunk);
part_number += 1;
if uploads.len() >= upload_concurrency {
match next_uploaded_part(&mut uploads).await {
Ok(part) => completed_parts.push(part),
Err(status) => {
uploads.abort_all();
while uploads.join_next().await.is_some() {}
let _ = self
.abort_multipart_upload(bucket, key, &upload_id, token)
.await;
return Err(status);
}
}
}
}
while !uploads.is_empty() {
match next_uploaded_part(&mut uploads).await {
Ok(part) => completed_parts.push(part),
Err(status) => {
uploads.abort_all();
while uploads.join_next().await.is_some() {}
let _ = self
.abort_multipart_upload(bucket, key, &upload_id, token)
.await;
return Err(status);
}
}
}
if completed_parts.is_empty() {
let _ = self
.abort_multipart_upload(bucket, key, &upload_id, token)
.await;
return self.upload_file_direct(bucket, key, path, token).await;
}
completed_parts.sort_by_key(|part| part.part_number);
let mut complete_request = Request::new(CompleteMultipartUploadRequest {
bucket: bucket.to_string(),
key: key.to_string(),
upload_id: upload_id.clone(),
parts: completed_parts,
});
attach_bearer(&mut complete_request, token)?;
if let Err(status) = client.complete_multipart_upload(complete_request).await {
let _ = self
.abort_multipart_upload(bucket, key, &upload_id, token)
.await;
return Err(Status::from_error(Box::new(status)));
}
tracing::info!(
bucket = bucket,
key = key,
path = %path.display(),
size_bytes,
part_count = part_number - 1,
elapsed_ms = started.elapsed().as_millis() as u64,
"Finished multipart artifact upload"
);
Ok(())
}
async fn download_object_to_file(
&self,
bucket: &str,
key: &str,
path: &Path,
token: &str,
) -> Result<(), Status> {
if tokio::fs::try_exists(path)
.await
.map_err(|e| Status::internal(format!("failed to inspect cache path {path:?}: {e}")))?
{
let size_bytes = cached_file_size(path).await?;
tracing::info!(
bucket = bucket,
key = key,
path = %path.display(),
size_bytes,
"Artifact cache hit"
);
return Ok(());
}
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|e| Status::internal(format!("failed to create {parent:?}: {e}")))?;
}
let started = Instant::now();
tracing::info!(
bucket = bucket,
key = key,
path = %path.display(),
"Downloading artifact object into local cache"
);
let mut client = self.object_client().await?;
let mut request = Request::new(GetObjectRequest {
bucket: bucket.to_string(),
key: key.to_string(),
version_id: String::new(),
range_start: -1,
range_end: -1,
if_match: String::new(),
if_none_match: String::new(),
if_modified_since: None,
if_unmodified_since: None,
});
attach_bearer(&mut request, token)?;
let mut stream = client
.get_object(request)
.await
.map_err(|status| Status::from_error(Box::new(status)))?
.into_inner();
let temp_path = path.with_extension("download");
let mut file = tokio::fs::File::create(&temp_path)
.await
.map_err(|e| Status::internal(format!("failed to create {temp_path:?}: {e}")))?;
let mut bytes_written: u64 = 0;
while let Some(item) = stream.next().await {
let response = item.map_err(|status| Status::from_error(Box::new(status)))?;
if let Some(get_object_response::Content::BodyChunk(chunk)) = response.content {
bytes_written += chunk.len() as u64;
tokio::io::AsyncWriteExt::write_all(&mut file, &chunk)
.await
.map_err(|e| Status::internal(format!("failed to write {temp_path:?}: {e}")))?;
}
}
tokio::io::AsyncWriteExt::flush(&mut file)
.await
.map_err(|e| Status::internal(format!("failed to flush {temp_path:?}: {e}")))?;
drop(file);
tokio::fs::rename(&temp_path, path)
.await
.map_err(|e| Status::internal(format!("failed to move object into cache: {e}")))?;
tracing::info!(
bucket = bucket,
key = key,
path = %path.display(),
bytes_written,
elapsed_ms = started.elapsed().as_millis() as u64,
"Finished downloading artifact object"
);
Ok(())
}
async fn materialize_source(&self, source_url: &str, path: &Path) -> Result<(), Status> {
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|e| Status::internal(format!("failed to create {parent:?}: {e}")))?;
}
if let Some(source_path) = source_url.strip_prefix("file://") {
tokio::fs::copy(source_path, path).await.map_err(|e| {
Status::internal(format!("failed to copy image source {source_path}: {e}"))
})?;
return Ok(());
}
if source_url.starts_with('/') {
tokio::fs::copy(source_url, path).await.map_err(|e| {
Status::internal(format!("failed to copy image source {source_url}: {e}"))
})?;
return Ok(());
}
if source_url.starts_with("http://") || source_url.starts_with("https://") {
let mut response = reqwest::get(source_url).await.map_err(|e| {
Status::unavailable(format!("failed to download image source: {e}"))
})?;
if response.status() != HttpStatusCode::OK {
return Err(Status::failed_precondition(format!(
"image download failed with HTTP {}",
response.status()
)));
}
let temp_path = path.with_extension("download");
let mut file = tokio::fs::File::create(&temp_path).await.map_err(|e| {
Status::internal(format!(
"failed to create downloaded image {}: {e}",
temp_path.display()
))
})?;
while let Some(chunk) = response.chunk().await.map_err(|e| {
Status::unavailable(format!("failed to read image response body: {e}"))
})? {
file.write_all(&chunk).await.map_err(|e| {
Status::internal(format!(
"failed to write downloaded image {}: {e}",
temp_path.display()
))
})?;
}
file.flush().await.map_err(|e| {
Status::internal(format!(
"failed to flush downloaded image {}: {e}",
temp_path.display()
))
})?;
drop(file);
tokio::fs::rename(&temp_path, path).await.map_err(|e| {
Status::internal(format!(
"failed to finalize downloaded image {}: {e}",
path.display()
))
})?;
return Ok(());
}
Err(Status::invalid_argument(
"source_url must be file://, an absolute path, or http(s)://",
))
}
async fn convert_to_qcow2(&self, source: &Path, destination: &Path) -> Result<(), Status> {
if tokio::fs::try_exists(destination)
.await
.map_err(|e| Status::internal(format!("failed to inspect {destination:?}: {e}")))?
{
return Ok(());
}
if let Some(parent) = destination.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|e| Status::internal(format!("failed to create {parent:?}: {e}")))?;
}
let status = Command::new("qemu-img")
.args([
"convert",
"-c",
"-O",
"qcow2",
source.to_string_lossy().as_ref(),
destination.to_string_lossy().as_ref(),
])
.status()
.await
.map_err(|e| Status::internal(format!("failed to spawn qemu-img convert: {e}")))?;
if status.success() {
Ok(())
} else {
Err(Status::internal(format!(
"qemu-img convert failed for {} with status {status}",
source.display()
)))
}
}
async fn convert_to_raw(&self, source: &Path, destination: &Path) -> Result<(), Status> {
if tokio::fs::try_exists(destination)
.await
.map_err(|e| Status::internal(format!("failed to inspect {destination:?}: {e}")))?
{
return Ok(());
}
if let Some(parent) = destination.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|e| Status::internal(format!("failed to create {parent:?}: {e}")))?;
}
let parallelism = raw_image_convert_parallelism().to_string();
let status = Command::new("qemu-img")
.args([
"convert",
"-t",
"none",
"-T",
"none",
"-m",
parallelism.as_str(),
"-O",
"raw",
source.to_string_lossy().as_ref(),
destination.to_string_lossy().as_ref(),
])
.status()
.await
.map_err(|e| Status::internal(format!("failed to spawn qemu-img convert: {e}")))?;
if status.success() {
Ok(())
} else {
Err(Status::internal(format!(
"qemu-img convert to raw failed for {} with status {status}",
source.display()
)))
}
}
async fn can_reuse_qcow2_source(
&self,
path: &Path,
source_format: ImageFormat,
) -> Result<bool, Status> {
if source_format != ImageFormat::Qcow2 {
return Ok(false);
}
let output = Command::new("qemu-img")
.args(["info", "--output", "json", path.to_string_lossy().as_ref()])
.output()
.await
.map_err(|e| Status::internal(format!("failed to spawn qemu-img info: {e}")))?;
if !output.status.success() {
return Ok(false);
}
let info: QemuImageInfo = serde_json::from_slice(&output.stdout)
.map_err(|e| Status::internal(format!("failed to parse qemu-img info output: {e}")))?;
Ok(info.format == "qcow2")
}
async fn sha256sum(&self, path: &Path) -> Result<String, Status> {
let output = Command::new("sha256sum")
.arg(path)
.output()
.await
.map_err(|e| Status::internal(format!("failed to spawn sha256sum: {e}")))?;
if !output.status.success() {
return Err(Status::internal(format!(
"sha256sum failed for {} with status {}",
path.display(),
output.status
)));
}
let stdout = String::from_utf8(output.stdout)
.map_err(|e| Status::internal(format!("invalid sha256sum output: {e}")))?;
stdout
.split_whitespace()
.next()
.map(str::to_string)
.ok_or_else(|| Status::internal("sha256sum output missing digest"))
}
async fn bucket_client(&self) -> Result<BucketServiceClient<Channel>, Status> {
Ok(BucketServiceClient::new(self.channel.clone())
.max_decoding_message_size(MAX_OBJECT_GRPC_MESSAGE_SIZE)
.max_encoding_message_size(MAX_OBJECT_GRPC_MESSAGE_SIZE))
}
async fn object_client(&self) -> Result<ObjectServiceClient<Channel>, Status> {
Ok(ObjectServiceClient::new(self.channel.clone())
.max_decoding_message_size(MAX_OBJECT_GRPC_MESSAGE_SIZE)
.max_encoding_message_size(MAX_OBJECT_GRPC_MESSAGE_SIZE))
}
async fn issue_project_token(&self, org_id: &str, project_id: &str) -> Result<String, Status> {
let cache_key = format!("{org_id}/{project_id}");
if let Some(cached) = self.project_tokens.get(&cache_key) {
if cached.expires_at > Instant::now() + Duration::from_secs(60) {
return Ok(cached.token.clone());
}
}
let principal_id = format!(
"plasmavmc-{}-{}",
sanitize_identifier(org_id),
sanitize_identifier(project_id)
);
let principal_ref = PrincipalRef::service_account(&principal_id);
let principal = match self
.iam_client
.get_principal(&principal_ref)
.await
.map_err(|e| Status::unavailable(format!("failed to fetch service account: {e}")))?
{
Some(principal) => principal,
None => self
.iam_client
.create_service_account(&principal_id, &principal_id, project_id)
.await
.map_err(|e| {
Status::unavailable(format!("failed to create service account: {e}"))
})?,
};
let existing_bindings = self
.iam_client
.list_bindings_for_principal(&principal_ref)
.await
.map_err(|e| Status::unavailable(format!("failed to list IAM bindings: {e}")))?;
let scope = Scope::project(project_id, org_id);
let has_binding = existing_bindings
.iter()
.any(|binding| binding.role_ref == "roles/ProjectAdmin" && binding.scope == scope);
if !has_binding {
let binding = PolicyBinding::new(
format!("binding-{principal_id}-{project_id}"),
principal_ref,
"roles/ProjectAdmin",
scope.clone(),
);
self.iam_client
.create_binding(&binding)
.await
.map_err(|e| Status::unavailable(format!("failed to create IAM binding: {e}")))?;
}
self.wait_for_project_admin_access(&principal, org_id, project_id)
.await?;
let token = self
.iam_client
.issue_token(&principal, vec![], scope, 3600)
.await
.map_err(|e| Status::unavailable(format!("failed to issue IAM token: {e}")))?;
self.project_tokens.insert(
cache_key,
CachedToken {
token: token.clone(),
expires_at: Instant::now() + Duration::from_secs(55 * 60),
},
);
Ok(token)
}
fn image_path(&self, image_id: &str) -> PathBuf {
self.image_cache_dir.join(format!("{image_id}.qcow2"))
}
fn raw_image_path(&self, image_id: &str) -> PathBuf {
self.image_cache_dir.join(format!("{image_id}.raw"))
}
async fn abort_multipart_upload(
&self,
bucket: &str,
key: &str,
upload_id: &str,
token: &str,
) -> Result<(), Status> {
let mut client = self.object_client().await?;
let mut request = Request::new(AbortMultipartUploadRequest {
bucket: bucket.to_string(),
key: key.to_string(),
upload_id: upload_id.to_string(),
});
attach_bearer(&mut request, token)?;
client
.abort_multipart_upload(request)
.await
.map_err(|status| Status::from_error(Box::new(status)))?;
Ok(())
}
async fn wait_for_project_admin_access(
&self,
principal: &iam_types::Principal,
org_id: &str,
project_id: &str,
) -> Result<(), Status> {
let deadline = Instant::now() + Duration::from_secs(30);
let resource = Resource::new("bucket", "artifact-bootstrap", org_id, project_id);
let mut last_error: String;
loop {
last_error = match self
.iam_client
.authorize(principal, "storage:buckets:create", &resource)
.await
{
Ok(true) => return Ok(()),
Ok(false) => "binding not yet effective".to_string(),
Err(error) => error.to_string(),
};
if Instant::now() >= deadline {
return Err(Status::failed_precondition(format!(
"timed out waiting for IAM ProjectAdmin access for {} in {}/{}: {}",
principal.id, org_id, project_id, last_error
)));
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
async fn cached_file_size(path: &Path) -> Result<u64, Status> {
tokio::fs::metadata(path)
.await
.map(|metadata| metadata.len())
.map_err(|e| Status::internal(format!("failed to stat {}: {e}", path.display())))
}
async fn next_uploaded_part(
uploads: &mut JoinSet<Result<CompletedPart, Status>>,
) -> Result<CompletedPart, Status> {
match uploads.join_next().await {
Some(Ok(result)) => result,
Some(Err(join_error)) => Err(Status::internal(format!(
"multipart upload task failed: {join_error}"
))),
None => Err(Status::internal(
"multipart upload queue drained unexpectedly",
)),
}
}
fn multipart_upload_concurrency() -> usize {
std::env::var("PLASMAVMC_LIGHTNINGSTOR_MULTIPART_CONCURRENCY")
.ok()
.and_then(|value| value.parse::<usize>().ok())
.map(|value| value.clamp(1, MAX_MULTIPART_UPLOAD_CONCURRENCY))
.unwrap_or(DEFAULT_MULTIPART_UPLOAD_CONCURRENCY)
}
fn multipart_upload_part_size() -> usize {
std::env::var("PLASMAVMC_LIGHTNINGSTOR_MULTIPART_PART_SIZE")
.ok()
.and_then(|value| value.parse::<usize>().ok())
.map(|value| value.clamp(MIN_MULTIPART_UPLOAD_PART_SIZE, MAX_MULTIPART_UPLOAD_PART_SIZE))
.unwrap_or(DEFAULT_MULTIPART_UPLOAD_PART_SIZE)
}
fn raw_image_convert_parallelism() -> usize {
std::env::var("PLASMAVMC_RAW_IMAGE_CONVERT_PARALLELISM")
.ok()
.and_then(|value| value.parse::<usize>().ok())
.map(|value| value.clamp(1, 64))
.unwrap_or(DEFAULT_RAW_IMAGE_CONVERT_PARALLELISM)
}
fn attach_bearer<T>(request: &mut Request<T>, token: &str) -> Result<(), Status> {
let value = MetadataValue::try_from(format!("Bearer {token}"))
.map_err(|_| Status::internal("invalid bearer token"))?;
request.metadata_mut().insert("authorization", value);
Ok(())
}
fn normalize_endpoint(endpoint: &str) -> String {
if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
endpoint.to_string()
} else {
format!("http://{endpoint}")
}
}
fn sanitize_identifier(value: &str) -> String {
value
.chars()
.map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '-' })
.collect()
}
fn image_object_key(org_id: &str, project_id: &str, image_id: &str) -> String {
format!("{org_id}/{project_id}/{image_id}.qcow2")
}