photoncloud-monorepo/k8shost/crates/k8shost-server/tests/creditservice_pod_integration.rs
centra d2149b6249 fix(lightningstor): Fix SigV4 canonicalization for AWS S3 auth
- Replace form_urlencoded with RFC 3986 compliant URI encoding
- Implement aws_uri_encode() matching AWS SigV4 spec exactly
- Unreserved chars (A-Z,a-z,0-9,-,_,.,~) not encoded
- All other chars percent-encoded with uppercase hex
- Preserve slashes in paths, encode in query params
- Normalize empty paths to '/' per AWS spec
- Fix test expectations (body hash, HMAC values)
- Add comprehensive SigV4 signature determinism test

This fixes the canonicalization mismatch that caused signature
validation failures in T047. Auth can now be enabled for production.

Refs: T058.S1
2025-12-12 06:23:46 +09:00

349 lines
12 KiB
Rust

//! CreditService integration test for k8shost Pod admission control
//!
//! Tests the 2-phase admission control flow for Pod creation:
//! 1. check_quota - validates balance/quota limits
//! 2. reserve_credits - reserves credits with TTL (Phase 1)
//! 3. [Create Pod] - actual Pod storage
//! 4. commit_reservation - commits credits on success (Phase 2)
//! 5. release_reservation - releases credits on failure (rollback)
use creditservice_api::{CreditServiceImpl, CreditStorage, InMemoryStorage};
use creditservice_client::Client as CreditServiceClient;
use creditservice_proto::credit_service_server::CreditServiceServer;
use k8shost_proto::{
pod_service_client::PodServiceClient, CreatePodRequest, Pod as ProtoPod, ObjectMeta as ProtoObjectMeta,
PodSpec as ProtoPodSpec, Container as ProtoContainer,
};
use k8shost_server::services::pod::PodServiceImpl;
use k8shost_server::storage::Storage;
use std::sync::Arc;
use tonic::transport::{Channel, Server};
use tonic::codegen::InterceptedService;
use tonic::service::Interceptor;
use tonic::Request;
use std::collections::HashMap;
struct TenantInterceptor {
org: String,
project: String,
}
impl Interceptor for TenantInterceptor {
fn call(&mut self, mut req: Request<()>) -> Result<Request<()>, tonic::Status> {
req.metadata_mut().insert("org-id", self.org.parse().unwrap());
req.metadata_mut().insert("project-id", self.project.parse().unwrap());
Ok(req)
}
}
async fn pod_client_with_meta(
addr: &str,
org: &str,
project: &str,
) -> PodServiceClient<InterceptedService<Channel, TenantInterceptor>> {
let channel = Channel::from_shared(format!("http://{}", addr))
.unwrap()
.connect()
.await
.unwrap();
PodServiceClient::with_interceptor(
channel,
TenantInterceptor {
org: org.to_string(),
project: project.to_string(),
},
)
}
/// Test that CreditService admission control denies Pod creation when balance insufficient
#[tokio::test]
async fn creditservice_pod_admission_control_deny() {
// 1. Start CreditService
let credit_addr = "127.0.0.1:50095";
let storage: Arc<dyn CreditStorage> = InMemoryStorage::new();
let credit_svc = CreditServiceImpl::new(storage.clone());
tokio::spawn(async move {
Server::builder()
.add_service(CreditServiceServer::new(credit_svc))
.serve(credit_addr.parse().unwrap())
.await
.unwrap();
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// 2. Create wallet with ZERO balance (should deny all requests)
let mut credit_client = CreditServiceClient::connect(format!("http://{}", credit_addr))
.await
.unwrap();
let _wallet = credit_client
.create_wallet("proj1", "org1", 0)
.await
.unwrap();
// 3. Set CREDITSERVICE_ENDPOINT for k8shost to connect
std::env::set_var("CREDITSERVICE_ENDPOINT", format!("http://{}", credit_addr));
// 4. Start k8shost Pod Service
let k8shost_addr = "127.0.0.1:50096";
// Use in-memory FlareDB for testing
let flaredb_storage = Arc::new(
Storage::new_direct("127.0.0.1:0".to_string())
.await
.unwrap_or_else(|_| panic!("Failed to create storage")),
);
let pod_svc = PodServiceImpl::new_with_credit_service(flaredb_storage).await;
tokio::spawn(async move {
Server::builder()
.add_service(k8shost_proto::pod_service_server::PodServiceServer::new(pod_svc))
.serve(k8shost_addr.parse().unwrap())
.await
.unwrap();
});
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
// 5. Try to create Pod - should fail with resource_exhausted
let mut pod_client = pod_client_with_meta(k8shost_addr, "org1", "proj1").await;
let mut requests = HashMap::new();
requests.insert("cpu".to_string(), "200m".to_string()); // 200 millicores
requests.insert("memory".to_string(), "256Mi".to_string()); // 256 MiB
let result = pod_client
.create_pod(CreatePodRequest {
pod: Some(ProtoPod {
metadata: Some(ProtoObjectMeta {
name: "test-pod".into(),
namespace: Some("default".into()),
org_id: Some("org1".into()),
project_id: Some("proj1".into()),
uid: None,
resource_version: None,
creation_timestamp: None,
labels: HashMap::new(),
annotations: HashMap::new(),
}),
spec: Some(ProtoPodSpec {
containers: vec![ProtoContainer {
name: "nginx".into(),
image: "nginx:latest".into(),
command: vec![],
args: vec![],
ports: vec![],
env: vec![],
}],
restart_policy: Some("Always".into()),
node_name: None,
}),
status: None,
}),
})
.await;
// Should fail with resource_exhausted (insufficient balance)
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(
err.code(),
tonic::Code::ResourceExhausted,
"Expected ResourceExhausted, got: {:?}",
err
);
assert!(
err.message().contains("Admission denied"),
"Expected 'Admission denied' message, got: {}",
err.message()
);
// Clean up
std::env::remove_var("CREDITSERVICE_ENDPOINT");
}
/// Test that CreditService admission control allows Pod creation with sufficient balance
#[tokio::test]
async fn creditservice_pod_admission_control_allow() {
// 1. Start CreditService
let credit_addr = "127.0.0.1:50097";
let storage: Arc<dyn CreditStorage> = InMemoryStorage::new();
let credit_svc = CreditServiceImpl::new(storage.clone());
tokio::spawn(async move {
Server::builder()
.add_service(CreditServiceServer::new(credit_svc))
.serve(credit_addr.parse().unwrap())
.await
.unwrap();
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// 2. Create wallet with sufficient balance
// Pod cost = cpu_millicores * 10 / 1000 + memory_gb * 5
// For 200m CPU and 256Mi memory: (200 * 10 / 1000) + (1 * 5) = 2 + 5 = 7 credits (rounded up)
let mut credit_client = CreditServiceClient::connect(format!("http://{}", credit_addr))
.await
.unwrap();
let wallet = credit_client
.create_wallet("proj2", "org2", 1000)
.await
.unwrap();
assert_eq!(wallet.balance, 1000);
// 3. Set CREDITSERVICE_ENDPOINT for k8shost to connect
std::env::set_var("CREDITSERVICE_ENDPOINT", format!("http://{}", credit_addr));
// 4. Start k8shost Pod Service
let k8shost_addr = "127.0.0.1:50098";
let flaredb_storage = Arc::new(
Storage::new_direct("127.0.0.1:0".to_string())
.await
.unwrap_or_else(|_| panic!("Failed to create storage")),
);
let pod_svc = PodServiceImpl::new_with_credit_service(flaredb_storage).await;
tokio::spawn(async move {
Server::builder()
.add_service(k8shost_proto::pod_service_server::PodServiceServer::new(pod_svc))
.serve(k8shost_addr.parse().unwrap())
.await
.unwrap();
});
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
// 5. Create Pod - should succeed
let mut pod_client = pod_client_with_meta(k8shost_addr, "org2", "proj2").await;
let pod = pod_client
.create_pod(CreatePodRequest {
pod: Some(ProtoPod {
metadata: Some(ProtoObjectMeta {
name: "test-pod-allowed".into(),
namespace: Some("default".into()),
org_id: Some("org2".into()),
project_id: Some("proj2".into()),
uid: None,
resource_version: None,
creation_timestamp: None,
labels: HashMap::new(),
annotations: HashMap::new(),
}),
spec: Some(ProtoPodSpec {
containers: vec![ProtoContainer {
name: "nginx".into(),
image: "nginx:latest".into(),
command: vec![],
args: vec![],
ports: vec![],
env: vec![],
}],
restart_policy: Some("Always".into()),
node_name: None,
}),
status: None,
}),
})
.await
.unwrap()
.into_inner();
assert!(pod.pod.is_some());
let created_pod = pod.pod.unwrap();
assert!(created_pod.metadata.is_some());
assert_eq!(created_pod.metadata.as_ref().unwrap().name, "test-pod-allowed");
// 6. Verify balance was deducted after commit
let wallet_after = credit_client.get_wallet("proj2").await.unwrap();
assert!(
wallet_after.balance < 1000,
"Balance should be reduced after Pod creation"
);
// Clean up
std::env::remove_var("CREDITSERVICE_ENDPOINT");
}
/// Test admission control smoke test - validates integration without FlareDB dependency
#[tokio::test]
async fn creditservice_pod_client_integration_smoke() {
// 1. Start CreditService
let credit_addr = "127.0.0.1:50099";
let storage: Arc<dyn CreditStorage> = InMemoryStorage::new();
let credit_svc = CreditServiceImpl::new(storage.clone());
let server_handle = tokio::spawn(async move {
Server::builder()
.add_service(CreditServiceServer::new(credit_svc))
.serve(credit_addr.parse().unwrap())
.await
.unwrap();
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// 2. Test CreditService client directly
let mut client = CreditServiceClient::connect(format!("http://{}", credit_addr))
.await
.unwrap();
// Create wallet
let wallet = client
.create_wallet("test-proj", "test-org", 500)
.await
.unwrap();
assert_eq!(wallet.project_id, "test-proj");
assert_eq!(wallet.balance, 500);
// Check quota (should pass)
let check = client
.check_quota(
"test-proj",
creditservice_client::ResourceType::K8sNode,
1,
10,
)
.await
.unwrap();
assert!(check.allowed);
// Reserve credits for Pod
let reservation = client
.reserve_credits("test-proj", 10, "Test Pod creation", "PodInstance", 300)
.await
.unwrap();
assert!(!reservation.id.is_empty());
// Commit reservation
let commit = client
.commit_reservation(&reservation.id, 10, "pod-123")
.await
.unwrap();
assert!(
commit.transaction.is_some(),
"Commit should create a transaction"
);
// Verify balance reduced
let wallet_after = client.get_wallet("test-proj").await.unwrap();
assert_eq!(wallet_after.balance, 490); // 500 - 10
// 3. Test reservation release (rollback scenario)
let reservation2 = client
.reserve_credits("test-proj", 20, "Test Pod creation 2", "PodInstance", 300)
.await
.unwrap();
// Release (rollback)
let released = client
.release_reservation(&reservation2.id, "pod creation failed")
.await
.unwrap();
assert!(released);
// Balance should be unchanged after release
let wallet_final = client.get_wallet("test-proj").await.unwrap();
assert_eq!(wallet_final.balance, 490); // Still 490
// Cleanup
server_handle.abort();
}