泰州市网站建设_网站建设公司_需求分析_seo优化
2026/1/16 11:30:10 网站建设 项目流程

标签:#Raft #分布式系统 #Go语言 #算法 #架构设计 #源码解析


🌊 前言:Raft 的世界观

Raft 的核心思想只有一句话:强领导者 (Strong Leader)
整个集群只有 Leader 能写日志,Follower 只能被动接受。Raft 将共识问题分解为三个子问题:

  1. Leader Election:老大挂了,怎么选新老大?
  2. Log Replication:老大怎么把指令强行同步给小弟?
  3. Safety:怎么保证数据不丢、不乱?

Raft 状态流转图 (Mermaid):

超时 (Election Timeout)

获得大多数票

发现更高 Term / 选举超时

发现更高 Term

Follower

Candidate

Leader


🏗️ 一、 骨架:Raft 结构体定义

我们要严格按照论文中的 Figure 2 来定义数据结构。

typeStateintconst(Follower State=iotaCandidate Leader)typeRaftstruct{mu sync.Mutex state State// --- 持久性状态 (所有节点都有) ---currentTermint// 当前任期votedForint// 给谁投过票 (-1 表示没投)log[]LogEntry// 日志条目 {Term, Command}// --- 易失性状态 (所有节点都有) ---commitIndexint// 已知已提交的最高索引lastAppliedint// 已应用到状态机的最高索引// --- Leader 独有状态 ---nextIndexmap[int]int// 发送给每个 Follower 的下一个日志索引matchIndexmap[int]int// 每个 Follower 已经复制到的最高索引// --- 选举相关 ---electionTimer*time.Timer heartbeatTimer*time.Timer}typeLogEntrystruct{TermintCommandinterface{}}

🗳️ 二、 核心一:选主 (Leader Election)

选主的本质是心跳超时
Leader 必须不断发送心跳(空的AppendEntriesRPC)来压制 Follower。一旦 Follower 在ElectionTimeout时间内没收到心跳,它就造反。

1. 触发选举
func(rf*Raft)runElectionTimer(){timeout:=randomTimeout()// 随机 150-300msfor{<-rf.electionTimer.C rf.mu.Lock()ifrf.state!=Leader{// 变成 Candidate,任期 +1,给自己投票rf.state=Candidate rf.currentTerm++rf.votedFor=rf.me rf.persist()// 发起拉票gorf.startElection()// 重置定时器rf.electionTimer.Reset(randomTimeout())}rf.mu.Unlock()}}
2. 处理投票请求 (RequestVote RPC)

这是面试重点:什么情况下我才会给你投票?

  1. 你的 Term >= 我的 Term。
  2. 我这个 Term 还没投过票(或者投的就是你)。
  3. 关键点:你的日志至少和我一样新 (Up-to-Date)。
func(rf*Raft)RequestVote(args*RequestVoteArgs,reply*RequestVoteReply){rf.mu.Lock()deferrf.mu.Unlock()// 1. 如果对方任期小,直接拒绝ifargs.Term<rf.currentTerm{reply.VoteGranted=falsereply.Term=rf.currentTermreturn}// 2. 如果对方任期大,我变成 Followerifargs.Term>rf.currentTerm{rf.currentTerm=args.Term rf.state=Follower rf.votedFor=-1}// 3. 检查日志是否足够新 (比较 LastLogTerm 和 LastLogIndex)lastLogIndex:=len(rf.log)-1lastLogTerm:=rf.log[lastLogIndex].Term upToDate:=falseifargs.LastLogTerm>lastLogTerm{upToDate=true}elseifargs.LastLogTerm==lastLogTerm&&args.LastLogIndex>=lastLogIndex{upToDate=true}// 4. 投票逻辑if(rf.votedFor==-1||rf.votedFor==args.CandidateId)&&upToDate{reply.VoteGranted=truerf.votedFor=args.CandidateId rf.electionTimer.Reset(randomTimeout())// 投票也算收到消息,重置超时}else{reply.VoteGranted=false}reply.Term=rf.currentTerm}

📚 三、 核心二:日志复制 (Log Replication)

一旦选出 Leader,它就开始处理客户端请求,并将日志同步给 Follower。

日志复制流程图 (Mermaid):

FollowerLeaderClientFollowerLeaderClientalt[Log Match (一致)][Log Mismatch (不一致)]1. 发送命令 Set X=12. 写入本地 Log (Index=N)3. AppendEntries RPC (PrevIndex=N-1, Entries=[X=1])写入本地 LogSuccessFalse (Conflict)Decrement nextIndexRetry with older entries4. 收到多数派 Success ->> Commit5. 返回结果
1. Leader 发送日志

Leader 对每个 Follower 维护一个nextIndex

func(rf*Raft)sendAppendEntries(serverint){rf.mu.Lock()ifrf.state!=Leader{rf.mu.Unlock()return}prevLogIndex:=rf.nextIndex[server]-1prevLogTerm:=rf.log[prevLogIndex].Term entries:=rf.log[rf.nextIndex[server]:]// 发送从 nextIndex 开始的所有日志args:=AppendEntriesArgs{Term:rf.currentTerm,LeaderId:rf.me,PrevLogIndex:prevLogIndex,PrevLogTerm:prevLogTerm,Entries:entries,LeaderCommit:rf.commitIndex,}rf.mu.Unlock()varreply AppendEntriesReplyifrf.peers[server].Call("Raft.AppendEntries",&args,&reply){rf.mu.Lock()deferrf.mu.Unlock()ifreply.Term>rf.currentTerm{rf.state=Follower rf.currentTerm=reply.Termreturn}ifreply.Success{// 复制成功:更新 matchIndex 和 nextIndexrf.matchIndex[server]=prevLogIndex+len(entries)rf.nextIndex[server]=rf.matchIndex[server]+1rf.updateCommitIndex()// 尝试推进 CommitIndex}else{// 复制失败:回退 nextIndex (简单优化版:直接减一,或者利用 reply 中的 conflictIndex 快速回退)rf.nextIndex[server]--}}}
2. Follower 处理日志 (一致性检查)

这是 Raft 保证数据不乱的关键:Log Matching Property
Follower 在追加日志前,必须检查PrevLogIndexPrevLogTerm是否匹配。

func(rf*Raft)AppendEntries(args*AppendEntriesArgs,reply*AppendEntriesReply){rf.mu.Lock()deferrf.mu.Unlock()// 1. Term 检查ifargs.Term<rf.currentTerm{reply.Success=falsereply.Term=rf.currentTermreturn}// 保持 Follower 状态,重置选举超时rf.state=Follower rf.electionTimer.Reset(randomTimeout())// 2. 一致性检查:我这里的 PrevLogIndex 处是否有日志?且 Term 是否匹配?iflen(rf.log)<=args.PrevLogIndex||rf.log[args.PrevLogIndex].Term!=args.PrevLogTerm{reply.Success=falsereturn// 告诉 Leader 回退}// 3. 冲突解决:删除冲突后的所有日志,追加新日志// 注意:不要无脑覆盖,只有冲突时才截断,防止过期包覆盖新日志insertIndex:=args.PrevLogIndex+1fori,entry:=rangeargs.Entries{ifinsertIndex+i>=len(rf.log){rf.log=append(rf.log,entry)}elseifrf.log[insertIndex+i].Term!=entry.Term{rf.log=rf.log[:insertIndex+i]// 截断rf.log=append(rf.log,entry)}}// 4. 更新 CommitIndexifargs.LeaderCommit>rf.commitIndex{rf.commitIndex=min(args.LeaderCommit,len(rf.log)-1)}reply.Success=true}

🔐 四、 难点解析:Safety (Commit 的条件)

Leader 什么时候可以把日志标记为Committed并应用到状态机?
答:当这个日志条目被复制到了“大多数”节点上。

func(rf*Raft)updateCommitIndex(){// 寻找一个 N,满足 N > commitIndex,且大多数节点的 matchIndex >= N,且 log[N].term == currentTermforN:=len(rf.log)-1;N>rf.commitIndex;N--{count:=1// 自己算一票forpeer:=rangerf.peers{ifpeer!=rf.me&&rf.matchIndex[peer]>=N{count++}}ifcount>len(rf.peers)/2&&rf.log[N].Term==rf.currentTerm{rf.commitIndex=N// 触发 Apply 线程...break}}}

注意:Leader 只能提交当前 Term的日志。对于旧 Term 的日志,即使复制到了多数派,也不能直接提交,必须等待当前 Term 的日志提交时间接提交(Figure 8 问题)。


🎯 总结

手写 Raft 的过程,就是理解**“一致性”“可用性”**博弈的过程。

  • RequestVote保证了 Leader 的合法性。
  • AppendEntriesPrevLog检查保证了日志的连续性。
  • Majority Quorum保证了脑裂时不会产生两个有效 Leader。

当你能闭着眼睛画出nextIndex的回退流程时,你对分布式系统的理解已经超越了 90% 的 CRUD 工程师。

Next Step:
上面的代码是内存版的。尝试引入LevelDB或简单的文件 IO,在persist()函数中实现currentTermvotedForlog的持久化,让你的 Raft 在进程崩溃重启后依然能恢复数据。这就是一个微型 Etcd 的雏形!

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询