photoncloud-monorepo/prismnet/crates/prismnet-server/src/metadata.rs
centra 3eeb303dcb feat: Batch commit for T039.S3 deployment
Includes all pending changes needed for nixos-anywhere:
- fiberlb: L7 policy, rule, certificate types
- deployer: New service for cluster management
- nix-nos: Generic network modules
- Various service updates and fixes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-13 04:34:51 +09:00

1159 lines
38 KiB
Rust

//! Network metadata storage using ChainFire or in-memory store
use chainfire_client::Client as ChainFireClient;
use dashmap::DashMap;
use prismnet_types::{
IPAllocation, Port, PortId, SecurityGroup, SecurityGroupId, SecurityGroupRule,
SecurityGroupRuleId, ServiceIPPool, ServiceIPPoolId, Subnet, SubnetId, Vpc, VpcId,
};
use std::sync::Arc;
use tokio::sync::Mutex;
/// Result type for metadata operations
pub type Result<T> = std::result::Result<T, MetadataError>;
/// Metadata operation error
#[derive(Debug, thiserror::Error)]
pub enum MetadataError {
#[error("Storage error: {0}")]
Storage(String),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Not found: {0}")]
NotFound(String),
#[error("Invalid argument: {0}")]
InvalidArgument(String),
}
/// Storage backend enum
enum StorageBackend {
ChainFire(Arc<Mutex<ChainFireClient>>),
InMemory(Arc<DashMap<String, String>>),
}
/// Central metadata store for all network resources
pub struct NetworkMetadataStore {
backend: StorageBackend,
}
impl NetworkMetadataStore {
/// Create a new metadata store with ChainFire backend
pub async fn new(endpoint: Option<String>) -> Result<Self> {
let endpoint = endpoint.unwrap_or_else(|| {
std::env::var("NOVANET_CHAINFIRE_ENDPOINT")
.unwrap_or_else(|_| "http://127.0.0.1:50051".to_string())
});
let client = ChainFireClient::connect(&endpoint).await.map_err(|e| {
MetadataError::Storage(format!("Failed to connect to ChainFire: {}", e))
})?;
Ok(Self {
backend: StorageBackend::ChainFire(Arc::new(Mutex::new(client))),
})
}
// Helper: find subnet by ID (scan) for validation paths
pub async fn find_subnet_by_id(&self, id: &SubnetId) -> Result<Option<Subnet>> {
let entries = self.get_prefix("/prismnet/subnets/").await?;
for (_, value) in entries {
if let Ok(subnet) = serde_json::from_str::<Subnet>(&value) {
if &subnet.id == id {
return Ok(Some(subnet));
}
}
}
Ok(None)
}
/// Create a new in-memory metadata store (for testing)
pub fn new_in_memory() -> Self {
Self {
backend: StorageBackend::InMemory(Arc::new(DashMap::new())),
}
}
// =========================================================================
// Internal storage helpers
// =========================================================================
async fn put(&self, key: &str, value: &str) -> Result<()> {
match &self.backend {
StorageBackend::ChainFire(client) => {
let mut c = client.lock().await;
c.put_str(key, value)
.await
.map_err(|e| MetadataError::Storage(format!("ChainFire put failed: {}", e)))?;
}
StorageBackend::InMemory(map) => {
map.insert(key.to_string(), value.to_string());
}
}
Ok(())
}
async fn get(&self, key: &str) -> Result<Option<String>> {
match &self.backend {
StorageBackend::ChainFire(client) => {
let mut c = client.lock().await;
c.get_str(key)
.await
.map_err(|e| MetadataError::Storage(format!("ChainFire get failed: {}", e)))
}
StorageBackend::InMemory(map) => Ok(map.get(key).map(|v| v.value().clone())),
}
}
async fn delete_key(&self, key: &str) -> Result<()> {
match &self.backend {
StorageBackend::ChainFire(client) => {
let mut c = client.lock().await;
c.delete(key).await.map_err(|e| {
MetadataError::Storage(format!("ChainFire delete failed: {}", e))
})?;
}
StorageBackend::InMemory(map) => {
map.remove(key);
}
}
Ok(())
}
async fn get_prefix(&self, prefix: &str) -> Result<Vec<(String, String)>> {
match &self.backend {
StorageBackend::ChainFire(client) => {
let mut c = client.lock().await;
let items = c.get_prefix(prefix).await.map_err(|e| {
MetadataError::Storage(format!("ChainFire get_prefix failed: {}", e))
})?;
Ok(items
.into_iter()
.map(|(k, v)| {
(
String::from_utf8_lossy(&k).to_string(),
String::from_utf8_lossy(&v).to_string(),
)
})
.collect())
}
StorageBackend::InMemory(map) => {
let mut results = Vec::new();
for entry in map.iter() {
if entry.key().starts_with(prefix) {
results.push((entry.key().clone(), entry.value().clone()));
}
}
Ok(results)
}
}
}
// =========================================================================
// Key builders
// =========================================================================
fn vpc_key(org_id: &str, project_id: &str, vpc_id: &VpcId) -> String {
format!("/prismnet/vpcs/{}/{}/{}", org_id, project_id, vpc_id)
}
fn vpc_prefix(org_id: &str, project_id: &str) -> String {
format!("/prismnet/vpcs/{}/{}/", org_id, project_id)
}
fn subnet_key(vpc_id: &VpcId, subnet_id: &SubnetId) -> String {
format!("/prismnet/subnets/{}/{}", vpc_id, subnet_id)
}
fn subnet_prefix(vpc_id: &VpcId) -> String {
format!("/prismnet/subnets/{}/", vpc_id)
}
fn port_key(subnet_id: &SubnetId, port_id: &PortId) -> String {
format!("/prismnet/ports/{}/{}", subnet_id, port_id)
}
fn port_prefix(subnet_id: &SubnetId) -> String {
format!("/prismnet/ports/{}/", subnet_id)
}
fn sg_key(org_id: &str, project_id: &str, sg_id: &SecurityGroupId) -> String {
format!(
"/prismnet/security_groups/{}/{}/{}",
org_id, project_id, sg_id
)
}
fn sg_prefix(org_id: &str, project_id: &str) -> String {
format!("/prismnet/security_groups/{}/{}/", org_id, project_id)
}
fn service_ip_pool_key(
org_id: &str,
project_id: &str,
pool_id: &ServiceIPPoolId,
) -> String {
format!("/prismnet/ipam/pools/{}/{}/{}", org_id, project_id, pool_id)
}
fn service_ip_pool_prefix(org_id: &str, project_id: &str) -> String {
format!("/prismnet/ipam/pools/{}/{}/", org_id, project_id)
}
fn ip_allocation_key(org_id: &str, project_id: &str, ip_address: &str) -> String {
format!(
"/prismnet/ipam/allocations/{}/{}/{}",
org_id, project_id, ip_address
)
}
fn ip_allocation_prefix(org_id: &str, project_id: &str) -> String {
format!("/prismnet/ipam/allocations/{}/{}/", org_id, project_id)
}
// =========================================================================
// VPC Operations
// =========================================================================
pub async fn create_vpc(&self, vpc: Vpc) -> Result<VpcId> {
let id = vpc.id;
let key = Self::vpc_key(&vpc.org_id, &vpc.project_id, &id);
let value =
serde_json::to_string(&vpc).map_err(|e| MetadataError::Serialization(e.to_string()))?;
self.put(&key, &value).await?;
Ok(id)
}
pub async fn get_vpc(&self, org_id: &str, project_id: &str, id: &VpcId) -> Result<Option<Vpc>> {
let key = Self::vpc_key(org_id, project_id, id);
if let Some(value) = self.get(&key).await? {
let vpc: Vpc = serde_json::from_str(&value)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
Ok(Some(vpc))
} else {
Ok(None)
}
}
pub async fn list_vpcs(&self, org_id: &str, project_id: &str) -> Result<Vec<Vpc>> {
let prefix = Self::vpc_prefix(org_id, project_id);
let entries = self.get_prefix(&prefix).await?;
let mut vpcs = Vec::new();
for (_, value) in entries {
if let Ok(vpc) = serde_json::from_str::<Vpc>(&value) {
vpcs.push(vpc);
}
}
Ok(vpcs)
}
pub async fn update_vpc(
&self,
org_id: &str,
project_id: &str,
id: &VpcId,
name: Option<String>,
description: Option<String>,
) -> Result<Option<Vpc>> {
let vpc_opt = self.get_vpc(org_id, project_id, id).await?;
if let Some(mut vpc) = vpc_opt {
if let Some(n) = name {
vpc.name = n;
}
if let Some(d) = description {
vpc.description = Some(d);
}
vpc.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let key = Self::vpc_key(&vpc.org_id, &vpc.project_id, id);
let value = serde_json::to_string(&vpc)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
self.put(&key, &value).await?;
Ok(Some(vpc))
} else {
Ok(None)
}
}
pub async fn delete_vpc(
&self,
org_id: &str,
project_id: &str,
id: &VpcId,
) -> Result<Option<Vpc>> {
let vpc_opt = self.get_vpc(org_id, project_id, id).await?;
if let Some(vpc) = vpc_opt {
let key = Self::vpc_key(org_id, project_id, id);
self.delete_key(&key).await?;
Ok(Some(vpc))
} else {
Ok(None)
}
}
// =========================================================================
// Subnet Operations
// =========================================================================
pub async fn create_subnet(&self, subnet: Subnet) -> Result<SubnetId> {
let id = subnet.id;
let key = Self::subnet_key(&subnet.vpc_id, &id);
let value = serde_json::to_string(&subnet)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
self.put(&key, &value).await?;
Ok(id)
}
pub async fn get_subnet(&self, vpc_id: &VpcId, id: &SubnetId) -> Result<Option<Subnet>> {
let key = Self::subnet_key(vpc_id, id);
if let Some(value) = self.get(&key).await? {
let subnet: Subnet = serde_json::from_str(&value)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
Ok(Some(subnet))
} else {
Ok(None)
}
}
pub async fn list_subnets(
&self,
org_id: &str,
project_id: &str,
vpc_id: &VpcId,
) -> Result<Vec<Subnet>> {
// Ensure VPC belongs to tenant
if self.get_vpc(org_id, project_id, vpc_id).await?.is_none() {
return Ok(Vec::new());
}
let prefix = Self::subnet_prefix(vpc_id);
let entries = self.get_prefix(&prefix).await?;
let mut subnets = Vec::new();
for (_, value) in entries {
if let Ok(subnet) = serde_json::from_str::<Subnet>(&value) {
subnets.push(subnet);
}
}
Ok(subnets)
}
pub async fn update_subnet(
&self,
org_id: &str,
project_id: &str,
vpc_id: &VpcId,
id: &SubnetId,
name: Option<String>,
description: Option<String>,
dhcp_enabled: Option<bool>,
) -> Result<Option<Subnet>> {
let subnet_opt = self.find_subnet_by_id(id).await?;
if let Some(mut subnet) = subnet_opt {
// Verify ownership via parent VPC
let vpc = self
.get_vpc(org_id, project_id, &subnet.vpc_id)
.await?
.ok_or_else(|| MetadataError::NotFound("VPC not found for subnet".to_string()))?;
if vpc.id != *vpc_id {
return Ok(None);
}
if let Some(n) = name {
subnet.name = n;
}
if let Some(d) = description {
subnet.description = Some(d);
}
if let Some(dhcp) = dhcp_enabled {
subnet.dhcp_enabled = dhcp;
}
subnet.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let key = Self::subnet_key(&subnet.vpc_id, id);
let value = serde_json::to_string(&subnet)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
self.put(&key, &value).await?;
Ok(Some(subnet))
} else {
Ok(None)
}
}
pub async fn delete_subnet(
&self,
org_id: &str,
project_id: &str,
vpc_id: &VpcId,
id: &SubnetId,
) -> Result<Option<Subnet>> {
let subnet_opt = self.find_subnet_by_id(id).await?;
if let Some(subnet) = subnet_opt {
let vpc = self
.get_vpc(org_id, project_id, &subnet.vpc_id)
.await?
.ok_or_else(|| MetadataError::NotFound("VPC not found for subnet".to_string()))?;
if vpc.id != *vpc_id {
return Ok(None);
}
let key = Self::subnet_key(&subnet.vpc_id, id);
self.delete_key(&key).await?;
Ok(Some(subnet))
} else {
Ok(None)
}
}
// =========================================================================
// Port Operations
// =========================================================================
pub async fn create_port(&self, port: Port) -> Result<PortId> {
let id = port.id;
let key = Self::port_key(&port.subnet_id, &id);
let value = serde_json::to_string(&port)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
self.put(&key, &value).await?;
Ok(id)
}
pub async fn get_port(&self, subnet_id: &SubnetId, id: &PortId) -> Result<Option<Port>> {
let key = Self::port_key(subnet_id, id);
if let Some(value) = self.get(&key).await? {
let port: Port = serde_json::from_str(&value)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
Ok(Some(port))
} else {
Ok(None)
}
}
pub async fn list_ports(
&self,
subnet_id: Option<&SubnetId>,
device_id: Option<&str>,
) -> Result<Vec<Port>> {
let prefix = if let Some(subnet_id) = subnet_id {
Self::port_prefix(subnet_id)
} else {
"/prismnet/ports/".to_string()
};
let entries = self.get_prefix(&prefix).await?;
let mut ports = Vec::new();
for (_, value) in entries {
if let Ok(port) = serde_json::from_str::<Port>(&value) {
if let Some(dev_id) = device_id {
if port.device_id.as_deref() == Some(dev_id) {
ports.push(port);
}
} else {
ports.push(port);
}
}
}
Ok(ports)
}
pub async fn update_port(
&self,
org_id: &str,
project_id: &str,
subnet_id: &SubnetId,
id: &PortId,
name: Option<String>,
description: Option<String>,
security_group_ids: Option<Vec<SecurityGroupId>>,
admin_state_up: Option<bool>,
) -> Result<Option<Port>> {
// Verify subnet belongs to tenant via VPC
let subnet_opt = self.find_subnet_by_id(subnet_id).await?;
if let Some(subnet) = subnet_opt {
if self
.get_vpc(org_id, project_id, &subnet.vpc_id)
.await?
.is_none()
{
return Ok(None);
}
} else {
return Ok(None);
}
let port_opt = self.get_port(subnet_id, id).await?;
if let Some(mut port) = port_opt {
if let Some(n) = name {
port.name = n;
}
if let Some(d) = description {
port.description = Some(d);
}
if let Some(sgs) = security_group_ids {
port.security_groups = sgs;
}
if let Some(admin) = admin_state_up {
port.admin_state_up = admin;
}
port.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let key = Self::port_key(&port.subnet_id, id);
let value = serde_json::to_string(&port)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
self.put(&key, &value).await?;
Ok(Some(port))
} else {
Ok(None)
}
}
pub async fn delete_port(
&self,
org_id: &str,
project_id: &str,
subnet_id: &SubnetId,
id: &PortId,
) -> Result<Option<Port>> {
// Verify subnet belongs to tenant via VPC
let subnet_opt = self.find_subnet_by_id(subnet_id).await?;
if let Some(subnet) = subnet_opt {
if self
.get_vpc(org_id, project_id, &subnet.vpc_id)
.await?
.is_none()
{
return Ok(None);
}
} else {
return Ok(None);
}
let port_opt = self.get_port(subnet_id, id).await?;
if let Some(port) = port_opt {
let key = Self::port_key(&port.subnet_id, id);
self.delete_key(&key).await?;
Ok(Some(port))
} else {
Ok(None)
}
}
pub async fn attach_device(
&self,
port_id: &PortId,
subnet_id: &SubnetId,
device_id: String,
device_type: prismnet_types::DeviceType,
) -> Result<Option<Port>> {
let port_opt = self.get_port(subnet_id, port_id).await?;
if let Some(mut port) = port_opt {
port.device_id = Some(device_id);
port.device_type = device_type;
port.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let key = Self::port_key(&port.subnet_id, port_id);
let value = serde_json::to_string(&port)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
self.put(&key, &value).await?;
Ok(Some(port))
} else {
Ok(None)
}
}
pub async fn detach_device(
&self,
port_id: &PortId,
subnet_id: &SubnetId,
) -> Result<Option<Port>> {
let port_opt = self.get_port(subnet_id, port_id).await?;
if let Some(mut port) = port_opt {
port.device_id = None;
port.device_type = prismnet_types::DeviceType::None;
port.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let key = Self::port_key(&port.subnet_id, port_id);
let value = serde_json::to_string(&port)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
self.put(&key, &value).await?;
Ok(Some(port))
} else {
Ok(None)
}
}
// =========================================================================
// IP Address Management (IPAM)
// =========================================================================
/// Allocate next available IP address from subnet CIDR
pub async fn allocate_ip(
&self,
org_id: &str,
project_id: &str,
subnet_id: &SubnetId,
) -> Result<Option<String>> {
// locate subnet and verify tenant via parent VPC
let subnet = self
.find_subnet_by_id(subnet_id)
.await?
.ok_or_else(|| MetadataError::NotFound("Subnet not found".to_string()))?;
let vpc = self
.get_vpc(org_id, project_id, &subnet.vpc_id)
.await?
.ok_or_else(|| MetadataError::NotFound("VPC not found for subnet".to_string()))?;
if vpc.id != subnet.vpc_id {
return Ok(None);
}
// Parse CIDR to get network and available IPs
let allocated_ips: Vec<String> = self
.list_ports(Some(subnet_id), None)
.await?
.iter()
.filter_map(|p| p.ip_address.clone())
.collect();
Ok(self.find_next_available_ip(
&subnet.cidr_block,
&allocated_ips,
subnet.gateway_ip.as_deref(),
))
}
/// Find next available IP in CIDR, avoiding gateway and allocated IPs
fn find_next_available_ip(
&self,
cidr: &str,
allocated: &[String],
gateway: Option<&str>,
) -> Option<String> {
// Parse CIDR (e.g., "10.0.1.0/24")
let parts: Vec<&str> = cidr.split('/').collect();
if parts.len() != 2 {
return None;
}
let base_ip = parts[0];
let prefix_len: u32 = parts[1].parse().ok()?;
// Parse base IP octets
let octets: Vec<u8> = base_ip.split('.').filter_map(|s| s.parse().ok()).collect();
if octets.len() != 4 {
return None;
}
let base_u32 = ((octets[0] as u32) << 24)
| ((octets[1] as u32) << 16)
| ((octets[2] as u32) << 8)
| (octets[3] as u32);
// Calculate usable IP range
let host_bits = 32 - prefix_len;
let max_hosts = (1u32 << host_bits) - 2; // Exclude network and broadcast
// Try to allocate from .10 onwards (skip .1-.9 for common services)
for offset in 10..=max_hosts {
let ip_u32 = base_u32 + offset;
let ip = format!(
"{}.{}.{}.{}",
(ip_u32 >> 24) & 0xFF,
(ip_u32 >> 16) & 0xFF,
(ip_u32 >> 8) & 0xFF,
ip_u32 & 0xFF
);
// Skip if gateway or already allocated
if Some(ip.as_str()) == gateway || allocated.contains(&ip) {
continue;
}
return Some(ip);
}
None
}
// =========================================================================
// Service IP Pool Operations (IPAM)
// =========================================================================
pub async fn create_service_ip_pool(&self, pool: ServiceIPPool) -> Result<ServiceIPPoolId> {
let id = pool.id;
let key = Self::service_ip_pool_key(&pool.org_id, &pool.project_id, &id);
let value = serde_json::to_string(&pool)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
self.put(&key, &value).await?;
Ok(id)
}
pub async fn get_service_ip_pool(
&self,
org_id: &str,
project_id: &str,
pool_id: &ServiceIPPoolId,
) -> Result<Option<ServiceIPPool>> {
let key = Self::service_ip_pool_key(org_id, project_id, pool_id);
if let Some(value) = self.get(&key).await? {
let pool: ServiceIPPool = serde_json::from_str(&value)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
Ok(Some(pool))
} else {
Ok(None)
}
}
pub async fn list_service_ip_pools(
&self,
org_id: &str,
project_id: &str,
) -> Result<Vec<ServiceIPPool>> {
let prefix = Self::service_ip_pool_prefix(org_id, project_id);
let entries = self.get_prefix(&prefix).await?;
let mut pools = Vec::new();
for (_, value) in entries {
if let Ok(pool) = serde_json::from_str::<ServiceIPPool>(&value) {
pools.push(pool);
}
}
Ok(pools)
}
pub async fn allocate_service_ip(
&self,
pool_id: &ServiceIPPoolId,
ip_address: &str,
allocation: IPAllocation,
) -> Result<()> {
// Load pool to find org_id/project_id (scan approach)
let prefix = "/prismnet/ipam/pools/";
let entries = self.get_prefix(prefix).await?;
let mut pool_opt: Option<ServiceIPPool> = None;
let mut pool_key = String::new();
for (key, value) in entries {
if let Ok(pool) = serde_json::from_str::<ServiceIPPool>(&value) {
if &pool.id == pool_id {
pool_opt = Some(pool);
pool_key = key;
break;
}
}
}
let mut pool = pool_opt
.ok_or_else(|| MetadataError::NotFound("Service IP Pool not found".to_string()))?;
// Update pool with allocated IP
pool.allocate_ip(ip_address.to_string());
let pool_value = serde_json::to_string(&pool)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
self.put(&pool_key, &pool_value).await?;
// Save allocation record
let allocation_key =
Self::ip_allocation_key(&allocation.org_id, &allocation.project_id, ip_address);
let allocation_value = serde_json::to_string(&allocation)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
self.put(&allocation_key, &allocation_value).await?;
Ok(())
}
pub async fn release_service_ip(
&self,
org_id: &str,
project_id: &str,
ip_address: &str,
) -> Result<()> {
// Get allocation to find pool
let allocation = self
.get_ip_allocation(org_id, project_id, ip_address)
.await?
.ok_or_else(|| MetadataError::NotFound("IP allocation not found".to_string()))?;
// Load and update pool
let prefix = "/prismnet/ipam/pools/";
let entries = self.get_prefix(prefix).await?;
for (key, value) in entries {
if let Ok(mut pool) = serde_json::from_str::<ServiceIPPool>(&value) {
if pool.id == allocation.pool_id {
pool.release_ip(ip_address);
let pool_value = serde_json::to_string(&pool)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
self.put(&key, &pool_value).await?;
break;
}
}
}
// Delete allocation record
let allocation_key = Self::ip_allocation_key(org_id, project_id, ip_address);
self.delete_key(&allocation_key).await?;
Ok(())
}
pub async fn get_ip_allocation(
&self,
org_id: &str,
project_id: &str,
ip_address: &str,
) -> Result<Option<IPAllocation>> {
let key = Self::ip_allocation_key(org_id, project_id, ip_address);
if let Some(value) = self.get(&key).await? {
let allocation: IPAllocation = serde_json::from_str(&value)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
Ok(Some(allocation))
} else {
Ok(None)
}
}
// =========================================================================
// Security Group Operations
// =========================================================================
pub async fn create_security_group(&self, sg: SecurityGroup) -> Result<SecurityGroupId> {
let id = sg.id;
let key = Self::sg_key(&sg.org_id, &sg.project_id, &id);
let value =
serde_json::to_string(&sg).map_err(|e| MetadataError::Serialization(e.to_string()))?;
self.put(&key, &value).await?;
Ok(id)
}
pub async fn get_security_group(
&self,
org_id: &str,
project_id: &str,
id: &SecurityGroupId,
) -> Result<Option<SecurityGroup>> {
let key = Self::sg_key(org_id, project_id, id);
if let Some(value) = self.get(&key).await? {
let sg: SecurityGroup = serde_json::from_str(&value)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
if sg.org_id != org_id || sg.project_id != project_id {
return Ok(None);
}
Ok(Some(sg))
} else {
Ok(None)
}
}
pub async fn list_security_groups(
&self,
org_id: &str,
project_id: &str,
) -> Result<Vec<SecurityGroup>> {
let prefix = Self::sg_prefix(org_id, project_id);
let entries = self.get_prefix(&prefix).await?;
let mut sgs = Vec::new();
for (_, value) in entries {
if let Ok(sg) = serde_json::from_str::<SecurityGroup>(&value) {
sgs.push(sg);
}
}
Ok(sgs)
}
pub async fn update_security_group(
&self,
org_id: &str,
project_id: &str,
id: &SecurityGroupId,
name: Option<String>,
description: Option<String>,
) -> Result<Option<SecurityGroup>> {
let sg_opt = self.get_security_group(org_id, project_id, id).await?;
if let Some(mut sg) = sg_opt {
if let Some(n) = name {
sg.name = n;
}
if let Some(d) = description {
sg.description = Some(d);
}
sg.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let key = Self::sg_key(&sg.org_id, &sg.project_id, id);
let value = serde_json::to_string(&sg)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
self.put(&key, &value).await?;
Ok(Some(sg))
} else {
Ok(None)
}
}
pub async fn delete_security_group(
&self,
org_id: &str,
project_id: &str,
id: &SecurityGroupId,
) -> Result<Option<SecurityGroup>> {
let sg_opt = self.get_security_group(org_id, project_id, id).await?;
if let Some(sg) = sg_opt {
let key = Self::sg_key(&sg.org_id, &sg.project_id, id);
self.delete_key(&key).await?;
Ok(Some(sg))
} else {
Ok(None)
}
}
pub async fn add_security_group_rule(
&self,
org_id: &str,
project_id: &str,
sg_id: &SecurityGroupId,
rule: SecurityGroupRule,
) -> Result<Option<SecurityGroupRule>> {
let sg_opt = self.get_security_group(org_id, project_id, sg_id).await?;
if let Some(mut sg) = sg_opt {
sg.add_rule(rule.clone());
let key = Self::sg_key(&sg.org_id, &sg.project_id, sg_id);
let value = serde_json::to_string(&sg)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
self.put(&key, &value).await?;
Ok(Some(rule))
} else {
Ok(None)
}
}
pub async fn remove_security_group_rule(
&self,
org_id: &str,
project_id: &str,
sg_id: &SecurityGroupId,
rule_id: &SecurityGroupRuleId,
) -> Result<Option<SecurityGroupRule>> {
let sg_opt = self.get_security_group(org_id, project_id, sg_id).await?;
if let Some(mut sg) = sg_opt {
let removed = sg.remove_rule(rule_id);
if removed.is_some() {
let key = Self::sg_key(&sg.org_id, &sg.project_id, sg_id);
let value = serde_json::to_string(&sg)
.map_err(|e| MetadataError::Serialization(e.to_string()))?;
self.put(&key, &value).await?;
}
Ok(removed)
} else {
Ok(None)
}
}
}
impl Default for NetworkMetadataStore {
fn default() -> Self {
Self::new_in_memory()
}
}
#[cfg(test)]
mod tests {
use super::*;
use prismnet_types::{IpProtocol, RuleDirection, SecurityGroup, SecurityGroupRule, Vpc};
#[tokio::test]
async fn test_vpc_crud() {
let store = NetworkMetadataStore::new_in_memory();
let vpc = Vpc::new("test-vpc", "org-1", "proj-1", "10.0.0.0/16");
let id = store.create_vpc(vpc.clone()).await.unwrap();
let retrieved = store
.get_vpc("org-1", "proj-1", &id)
.await
.unwrap()
.unwrap();
assert_eq!(retrieved.name, "test-vpc");
store
.update_vpc(
"org-1",
"proj-1",
&id,
Some("updated-vpc".to_string()),
None,
)
.await
.unwrap();
let updated = store
.get_vpc("org-1", "proj-1", &id)
.await
.unwrap()
.unwrap();
assert_eq!(updated.name, "updated-vpc");
let deleted = store.delete_vpc("org-1", "proj-1", &id).await.unwrap();
assert!(deleted.is_some());
assert!(store
.get_vpc("org-1", "proj-1", &id)
.await
.unwrap()
.is_none());
}
#[tokio::test]
async fn test_vpc_isolation() {
let store = NetworkMetadataStore::new_in_memory();
let vpc_a = Vpc::new("vpc-a", "org-a", "proj-a", "10.0.0.0/16");
let vpc_b = Vpc::new("vpc-b", "org-b", "proj-b", "10.1.0.0/16");
store.create_vpc(vpc_a).await.unwrap();
store.create_vpc(vpc_b).await.unwrap();
let list_a = store.list_vpcs("org-a", "proj-a").await.unwrap();
let list_b = store.list_vpcs("org-b", "proj-b").await.unwrap();
assert_eq!(list_a.len(), 1);
assert_eq!(list_a[0].org_id, "org-a");
assert_eq!(list_b.len(), 1);
assert_eq!(list_b[0].org_id, "org-b");
}
#[tokio::test]
async fn test_cross_tenant_delete_denied() {
let store = NetworkMetadataStore::new_in_memory();
let vpc = Vpc::new("vpc-a", "org-a", "proj-a", "10.0.0.0/16");
let vpc_id = store.create_vpc(vpc).await.unwrap();
let result = store.delete_vpc("org-b", "proj-b", &vpc_id).await.unwrap();
assert!(result.is_none());
let still_exists = store.get_vpc("org-a", "proj-a", &vpc_id).await.unwrap();
assert!(still_exists.is_some());
}
#[tokio::test]
async fn test_subnet_crud() {
let store = NetworkMetadataStore::new_in_memory();
let vpc = Vpc::new("test-vpc", "org-1", "proj-1", "10.0.0.0/16");
let vpc_id = store.create_vpc(vpc).await.unwrap();
let mut subnet = prismnet_types::Subnet::new("test-subnet", vpc_id, "10.0.1.0/24");
subnet.gateway_ip = Some("10.0.1.1".to_string());
let subnet_id = store.create_subnet(subnet).await.unwrap();
let retrieved = store
.get_subnet(&vpc_id, &subnet_id)
.await
.unwrap()
.unwrap();
assert_eq!(retrieved.name, "test-subnet");
let subnets = store
.list_subnets("org-1", "proj-1", &vpc_id)
.await
.unwrap();
assert_eq!(subnets.len(), 1);
}
#[tokio::test]
async fn test_port_crud() {
let store = NetworkMetadataStore::new_in_memory();
let vpc = Vpc::new("test-vpc", "org-1", "proj-1", "10.0.0.0/16");
let vpc_id = store.create_vpc(vpc).await.unwrap();
let mut subnet = prismnet_types::Subnet::new("test-subnet", vpc_id, "10.0.1.0/24");
subnet.gateway_ip = Some("10.0.1.1".to_string());
let subnet_id = store.create_subnet(subnet).await.unwrap();
let port = prismnet_types::Port::new("test-port", subnet_id);
let port_id = store.create_port(port).await.unwrap();
let retrieved = store.get_port(&subnet_id, &port_id).await.unwrap().unwrap();
assert_eq!(retrieved.name, "test-port");
}
#[tokio::test]
async fn test_security_group_crud() {
let store = NetworkMetadataStore::new_in_memory();
let sg = SecurityGroup::new("default", "org-1", "proj-1");
let sg_id = store.create_security_group(sg).await.unwrap();
let retrieved = store
.get_security_group("org-1", "proj-1", &sg_id)
.await
.unwrap()
.unwrap();
assert_eq!(retrieved.name, "default");
assert_eq!(retrieved.rules.len(), 1); // Default egress rule
// Add rule
let rule = SecurityGroupRule::new(sg_id, RuleDirection::Ingress, IpProtocol::Tcp);
store
.add_security_group_rule("org-1", "proj-1", &sg_id, rule.clone())
.await
.unwrap();
let updated = store
.get_security_group("org-1", "proj-1", &sg_id)
.await
.unwrap()
.unwrap();
assert_eq!(updated.rules.len(), 2);
}
#[tokio::test]
async fn test_ip_allocation() {
let store = NetworkMetadataStore::new_in_memory();
let vpc = Vpc::new("test-vpc", "org-1", "proj-1", "10.0.0.0/16");
let vpc_id = store.create_vpc(vpc).await.unwrap();
let mut subnet = prismnet_types::Subnet::new("test-subnet", vpc_id, "10.0.1.0/24");
subnet.gateway_ip = Some("10.0.1.1".to_string());
let subnet_id = store.create_subnet(subnet).await.unwrap();
// Allocate first IP
let ip1 = store
.allocate_ip("org-1", "proj-1", &subnet_id)
.await
.unwrap()
.unwrap();
assert_eq!(ip1, "10.0.1.10"); // First available IP (skipping .1-.9)
// Create port with allocated IP
let mut port1 = prismnet_types::Port::new("port1", subnet_id);
port1.ip_address = Some(ip1.clone());
store.create_port(port1).await.unwrap();
// Allocate second IP
let ip2 = store
.allocate_ip("org-1", "proj-1", &subnet_id)
.await
.unwrap()
.unwrap();
assert_eq!(ip2, "10.0.1.11"); // Next available
// Create port with second IP
let mut port2 = prismnet_types::Port::new("port2", subnet_id);
port2.ip_address = Some(ip2);
store.create_port(port2).await.unwrap();
// Gateway should be skipped
assert_ne!(ip1, "10.0.1.1");
}
}