photoncloud-monorepo/docs/por/T046-multi-raft-design/design.md
centra d2149b6249 fix(lightningstor): Fix SigV4 canonicalization for AWS S3 auth
- Replace form_urlencoded with RFC 3986 compliant URI encoding
- Implement aws_uri_encode() matching AWS SigV4 spec exactly
- Unreserved chars (A-Z,a-z,0-9,-,_,.,~) not encoded
- All other chars percent-encoded with uppercase hex
- Preserve slashes in paths, encode in query params
- Normalize empty paths to '/' per AWS spec
- Fix test expectations (body hash, HMAC values)
- Add comprehensive SigV4 signature determinism test

This fixes the canonicalization mismatch that caused signature
validation failures in T047. Auth can now be enabled for production.

Refs: T058.S1
2025-12-12 06:23:46 +09:00

10 KiB

T046: OpenRaft-Style Multi-Raft Core Library

設計方針

OpenRaft風のtick-driven設計で、Multi-Raft対応を最初から組み込む。

Key Principles:

  1. Tick-driven: 内部タイマー無し、外部からtick()で時間を進める
  2. Ready pattern: I/Oを実行せず、「やるべきこと」をReady構造体で返す
  3. Multi-Raft Native: 複数グループの効率的管理が設計に組み込まれている
  4. Pure Logic: Raftコアは純粋ロジック、テストが容易

アーキテクチャ

┌─────────────────────────────────────────────────────────────┐
│                      raft-core crate                         │
│              (Pure Raft logic, no I/O)                       │
│                                                              │
│  ┌─────────────────────────────────────────────────────┐    │
│  │                    RaftCore<S>                       │    │
│  │                                                      │    │
│  │  tick() → Ready      // 時間経過処理                 │    │
│  │  step(msg) → Ready   // メッセージ処理               │    │
│  │  propose(data) → Ready // クライアント書き込み       │    │
│  │  advance(applied)    // 処理完了通知                 │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘
                            │
          ┌─────────────────┴─────────────────┐
          ▼                                   ▼
┌─────────────────────┐           ┌─────────────────────────┐
│   ChainFire         │           │      FlareDB            │
│   (Single Raft)     │           │    (Multi-Raft)         │
│                     │           │                         │
│  ┌───────────────┐  │           │  ┌───────────────────┐  │
│  │   RaftNode    │  │           │  │   MultiRaft       │  │
│  │   (async)     │  │           │  │   Coordinator     │  │
│  │               │  │           │  │                   │  │
│  │ - tokio timer │  │           │  │ - groups: HashMap │  │
│  │ - gRPC I/O    │  │           │  │ - batch messages  │  │
│  │ - RocksDB     │  │           │  │ - shared tick     │  │
│  └───────────────┘  │           │  └───────────────────┘  │
│         │           │           │           │             │
│    ┌────┴────┐      │           │    ┌──────┴──────┐      │
│    │RaftCore │      │           │    │RaftCore x N │      │
│    └─────────┘      │           │    └─────────────┘      │
└─────────────────────┘           └─────────────────────────┘

Core API

RaftCore (純粋Raftロジック)

/// Pure Raft state machine - no I/O, no async
pub struct RaftCore<S: Storage> {
    id: NodeId,
    // Persistent state
    current_term: u64,
    voted_for: Option<NodeId>,
    log: Vec<LogEntry>,
    // Volatile state
    commit_index: u64,
    last_applied: u64,
    role: Role,
    // Leader state
    next_index: HashMap<NodeId, u64>,
    match_index: HashMap<NodeId, u64>,
    // Timing (tick counts, not wall clock)
    election_elapsed: u64,
    heartbeat_elapsed: u64,
    // Storage abstraction
    storage: S,
}

impl<S: Storage> RaftCore<S> {
    /// Create new Raft instance
    pub fn new(id: NodeId, peers: Vec<NodeId>, storage: S) -> Self;

    /// Advance logical time by one tick
    /// Returns Ready with actions to take (election, heartbeat, etc.)
    pub fn tick(&mut self) -> Ready;

    /// Process incoming Raft message
    pub fn step(&mut self, msg: Message) -> Ready;

    /// Propose new entry (leader only)
    pub fn propose(&mut self, data: Vec<u8>) -> Result<Ready, NotLeader>;

    /// Notify that Ready actions have been processed
    pub fn advance(&mut self, applied: Applied);

    /// Check if this node is leader
    pub fn is_leader(&self) -> bool;

    /// Get current leader (if known)
    pub fn leader(&self) -> Option<NodeId>;
}

Ready (出力アクション)

/// Actions to be executed by the caller (I/O layer)
#[derive(Default)]
pub struct Ready {
    /// Messages to send to other nodes
    pub messages: Vec<(NodeId, Message)>,

    /// Entries to append to log storage
    pub entries_to_persist: Vec<LogEntry>,

    /// Hard state to persist (term, voted_for)
    pub hard_state: Option<HardState>,

    /// Committed entries ready to apply to state machine
    pub committed_entries: Vec<LogEntry>,

    /// Snapshot to install (if any)
    pub snapshot: Option<Snapshot>,

    /// Soft state changes (leader, role) - for notification only
    pub soft_state: Option<SoftState>,
}

impl Ready {
    /// Check if there are any actions to take
    pub fn is_empty(&self) -> bool;

    /// Merge another Ready into this one
    pub fn merge(&mut self, other: Ready);
}

Storage Trait

/// Storage abstraction - caller provides implementation
pub trait Storage {
    /// Get persisted hard state
    fn hard_state(&self) -> HardState;

    /// Get log entries in range [start, end)
    fn entries(&self, start: u64, end: u64) -> Vec<LogEntry>;

    /// Get term at given index (None if not exists)
    fn term(&self, index: u64) -> Option<u64>;

    /// Get last log index
    fn last_index(&self) -> u64;

    /// Get first log index (after compaction)
    fn first_index(&self) -> u64;

    /// Get snapshot metadata (if any)
    fn snapshot(&self) -> Option<SnapshotMeta>;
}

Message Types

pub enum Message {
    RequestVote(RequestVoteRequest),
    RequestVoteResponse(RequestVoteResponse),
    AppendEntries(AppendEntriesRequest),
    AppendEntriesResponse(AppendEntriesResponse),
    InstallSnapshot(InstallSnapshotRequest),
    InstallSnapshotResponse(InstallSnapshotResponse),
}

Multi-Raft Coordinator

/// Manages multiple Raft groups efficiently
pub struct MultiRaft<S: Storage> {
    node_id: NodeId,
    groups: HashMap<GroupId, RaftCore<S>>,
    storage_factory: Box<dyn Fn(GroupId) -> S>,
}

impl<S: Storage> MultiRaft<S> {
    /// Tick all groups, return aggregated Ready
    pub fn tick(&mut self) -> MultiReady {
        let mut ready = MultiReady::default();
        for (gid, core) in &mut self.groups {
            let r = core.tick();
            ready.merge(*gid, r);
        }
        ready
    }

    /// Route message to appropriate group
    pub fn step(&mut self, gid: GroupId, msg: Message) -> Ready {
        self.groups.get_mut(&gid)
            .map(|c| c.step(msg))
            .unwrap_or_default()
    }

    /// Propose to specific group
    pub fn propose(&mut self, gid: GroupId, data: Vec<u8>) -> Result<Ready, Error>;

    /// Create new group
    pub fn create_group(&mut self, gid: GroupId, peers: Vec<NodeId>) -> Result<()>;

    /// Remove group
    pub fn remove_group(&mut self, gid: GroupId) -> Result<()>;
}

/// Aggregated Ready with message batching
#[derive(Default)]
pub struct MultiReady {
    /// Messages batched by destination node
    /// HashMap<NodeId, Vec<(GroupId, Message)>>
    pub messages: HashMap<NodeId, Vec<(GroupId, Message)>>,

    /// Per-group Ready (for storage operations)
    pub groups: HashMap<GroupId, Ready>,
}

Single-Raft Wrapper (ChainFire用)

/// Async wrapper for single Raft group
pub struct RaftNode {
    core: RaftCore<RocksDbStorage>,
    peers: HashMap<NodeId, PeerClient>,
    tick_interval: Duration,
    storage: Arc<RocksDB>,
}

impl RaftNode {
    /// Start the Raft node (spawns tick loop)
    pub async fn start(&mut self) {
        let mut interval = tokio::time::interval(self.tick_interval);
        loop {
            tokio::select! {
                _ = interval.tick() => {
                    let ready = self.core.tick();
                    self.process_ready(ready).await;
                }
                msg = self.receive_message() => {
                    let ready = self.core.step(msg);
                    self.process_ready(ready).await;
                }
            }
        }
    }

    async fn process_ready(&mut self, ready: Ready) {
        // 1. Persist entries and hard state
        if let Some(hs) = &ready.hard_state {
            self.storage.save_hard_state(hs)?;
        }
        self.storage.append_entries(&ready.entries_to_persist)?;

        // 2. Send messages
        for (to, msg) in ready.messages {
            self.peers.get(&to)?.send(msg).await?;
        }

        // 3. Apply committed entries
        for entry in ready.committed_entries {
            self.state_machine.apply(entry)?;
        }

        // 4. Notify core
        self.core.advance(Applied { ... });
    }
}

T041との比較

観点 T041 (現在) T046 (新設計)
I/O 統合 (直接実行) 分離 (Ready返却)
タイマー 内部 (tokio::interval) 外部 (tick count)
async 必須 コアは不要
Multi-Raft 別途ラッパー必要 ネイティブ対応
テスト async test必須 sync test可能
コード量 ~1,100 LOC ~800 LOC (core)

実装計画

Phase 内容 期間
P1 Core Refactor (T041→tick-driven) 1週間
P2 Single-Raft Wrapper (ChainFire) 3日
P3 Multi-Raft Coordinator (FlareDB) 1週間
P4 Advanced (split/merge/cross-shard) 将来

Total MVP: 2.5週間

次のアクション

  1. T041 P3完了 (統合テスト)
  2. T046 P1開始: core.rsからI/O削除、Ready pattern実装
  3. テスト: 純粋syncテストで動作確認