Leader 不参与读请求?etcd 线性读实现揭秘

📅 2026/6/30 4:34:01
Leader 不参与读请求?etcd 线性读实现揭秘
这不禁让人产生几个疑问Follower 如何知道自己读到的是最新数据Leader 如何向 Follower 证明某个状态已经提交为什么 ReadIndex 不需要写日志却依然能够保证线性一致性本文将结合 etcd 源码分析线性一致性读的完整实现。线性读和可串行化读区别在介绍源码前先讲下线性读和可串行化读的区别。线性读满足 CAP 理论中 C 的读模式意味着只要写“成功”commited的数据线性读一定可以读到该数据否则将不可用它满足 CAP 理论的 CP 模式。可串行化读不会与 Leader 进行额外交互因此可能读到落后的数据。它牺牲了一致性要求换取更低的延迟和更高的可用性。leader/follower 节点可自然的提供可串行化读服务它们的处理逻辑是一样的都是根据请求从本地存储引擎读数据。而线性读因为是“最新”的数据当 follower 接收到线性读请求时可以把请求转发给 leader 处理因为 leader 知道哪些是“最新”的这也是很自然的。但是线性读请求太多etcd 用线性读还是很普遍的比如 kubernetes会造成 leader 节点负载过大leader 只有一个严重的话可能会造成服务不可用。etcd 解决这种问题的方案是 follower 节点也可以处理线性读请求。follower 节点通过和 leader 节点交互获取“最新”数据然后在本地存储引擎读取最新数据。这种方式大大减轻了 leader 节点的压力。笔者觉得这是非常巧妙的解决方案。func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { ... // 1. 判断是否开始线性读 if !r.Serializable { // 2. 进入线性读处理逻辑 err s.linearizableReadNotify(ctx) ... if err ! nil { return nil, err } } ... // 3. get 用于读存储引擎中的数据 get : func() { resp, _, err txn.Range(ctx, s.Logger(), s.KV(), r) } // 4. 可串行化读流程会回调 get 获取数据 if serr : s.doSerialize(ctx, chk, get); serr ! nil { err serr return nil, err } // 5. 返回 resp 数据 return resp, err }整个读请求流程由EtcdServer.Range处理注意 leader 或 follower 都可以处理该请求。线性读和可串行化读的区别在于是否进入 2 线性读处理逻辑。这里 2 是核心我们在进入 2 之前先把可串行化读逻辑看一遍毕竟这部分是共用的。可串行化读可串行化读主要由 3 和 4 处理3 是一个闭包函数通过 4 回调。我们看下 4 在干嘛func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error { // 处理认证鉴权 ai, err : s.AuthInfoFromCtx(ctx) if err ! nil { return err } ... // 回调 get 函数 get() ... return nil }不难理解如果请求通过认证鉴权就会进入 get 函数处理该读请求func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) { ... // 调用 mvcc kv.Read 获取读事务 txnRead : kv.Read(mvcc.SharedBufReadTxMode, trace) defer txnRead.End() // executeRange 处理读请求 resp, err executeRange(ctx, lg, txnRead, r) return resp, trace, err } func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { ... // 根据请求组装 mvcc.RangeOptions 结构 ro : mvcc.RangeOptions{ Limit: limit, Rev: r.Revision, Count: r.CountOnly, } // 读事务读请求的数据 rr, err : txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro) if err ! nil { return nil, err } // 对请求数据 rr 进行处理得到 resp 并返回 return resp, nil }核心工作流程是通过读事务的Range方法获取存储引擎中的数据。这是可串行化读的流程我们从之前的分析可知线性读需要和 leader 节点交互获取“最新”数据然后等本地更新最新数据之后才能走可串行化读流程这部分就是在linearizableReadNotify做的。线性读是时候看看期待已久的linearizableReadNotify在做什么了。func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error { s.readMu.RLock() nc : s.readNotifier s.readMu.RUnlock() // signal linearizable loop for current notify if it hasnt been already select { case s.readwaitc - struct{}{}: default: } // wait for read state notification select { case -nc.c: return nc.err case -ctx.Done(): return ctx.Err() case -s.done: return errors.ErrStopped } }方法并不复杂不过逻辑很有意思。首先理清它的流程才能知道这个方法到底在干嘛。方法先把s.readNotifier赋给变量 nc注意这里用的是读锁意味着读请求都将获得s.readNotifier。接着进入 select它将struct{}{}发给s.readwaitc通道该通道是一个有缓冲容量为 1 的通道s.readwaitc make(chan struct{}, 1)这个 select 是在做什么呢我们想象下第一个请求进来往s.readwaitc发通知如果s.readwaitc还在处理请求那么同时来的其它请求会进入select:default分支。最后请求都会阻塞在nc.c等待处理结果。大致知道发送端的流程我们看消费端是怎么消费s.readwaitc的func (s *EtcdServer) linearizableReadLoop() { for { // 生成下一个请求的 id requestID : s.reqIDGen.Next() select { ... // 阻塞在 readwaitc 通道 case -s.readwaitc: ... } // 生成下一个 notifier nextnr : newNotifier() // 加写锁 s.readMu.Lock() // 将 s.readNotifier 赋值给变量 nr nr : s.readNotifier // 将下一个 notifier 赋值给 s.readNotifier s.readNotifier nextnr s.readMu.Unlock() confirmedIndex, err : s.requestCurrentIndex(leaderChangedNotifier, requestID) if isStopped(err) { return } if err ! nil { nr.notify(err) continue } // 获取本节点应用索引 appliedIndex : s.getAppliedIndex() // 如果本节点应用索引小于 confirmed 索引说明请求的数据还没应用到本节点 // 需要继续等 if appliedIndex confirmedIndex { select { case -s.applyWait.Wait(confirmedIndex): case -s.stopping: return } } // 如果本节点应用索引大于等于 confirmed 索引说明请求的数据已应用到本节点 // 直接返回即可 nr.notify(nil) ... } }可以把 readwaitc 理解为一个“批处理触发器”。第一个到达的读请求负责通知后台协程执行一次 ReadIndex。后续同时到达的读请求不会再次触发 ReadIndex而是挂到同一个 notifier 上等待结果。因此1000 个同时到达的线性读请求最终只会触发一次 Leader 交互。这正是 etcd 线性读高性能的重要原因。我们也可以用sync.Cond来实现如果多个请求发现已经有请求在处理了则休眠等待请求处理完被唤醒。但是sync.Cond不能天然的表示分批的概念不如通道实现来的优雅。etcd 用这种方式高效且巧妙的解决了高并发分批读请求。linearizableReadLoop接收到读请求通知后会和 leader 交互换取现在最新的 committed index后续根据该 committed index 从本地读数据。如果本地应用 index 已经大于等于 committed index则表示本地已经是最新了可以直接走可串行化读。如果本地应用 index 小于 committed index表示本地不是最新数据需要等待本地 raft 更新应用 index 到 committed index 才能开始可串行化读。这里的重点在// requestCurrentIndex 获取 confirmed index也就是 leader 的 committed index confirmedIndex, err : s.requestCurrentIndex(leaderChangedNotifier, requestID) func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier -chan struct{}, requestID uint64) (uint64, error) { // sendReadIndex 发送 ReadIndex 消息给 leader err : s.sendReadIndex(requestID) if err ! nil { return 0, err } ... for { select { case rs : -s.r.readStateC: // 阻塞等待 raftNode.readStateC 通道 requestIDBytes : uint64ToBigEndianBytes(requestID) // 判断返回的响应是否是本次请求的响应 gotOwnResponse : bytes.Equal(rs.RequestCtx, requestIDBytes) if !gotOwnResponse { ... // 如果不是继续阻塞等本次请求的响应 continue } // 如果是返回本次请求的 committed index return rs.Index, nil ... } ... }requestCurrentIndex的逻辑包含两块:sendReadIndex发送ReadIndex消息给 leaderfunc (s *EtcdServer) sendReadIndex(requestIndex uint64) error { ctxToSend : uint64ToBigEndianBytes(requestIndex) cctx, cancel : context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) // 调用本机 raft 发送 ReadIndex 消息给 leader err : s.r.ReadIndex(cctx, ctxToSend) cancel() .. return nil } func (n *node) ReadIndex(ctx context.Context, rctx []byte) error { // 发送 MsgReadIndex 给 leader return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) }阻塞等待本节点raftNode.readStateC通道获取 leader 的响应requestCurrentIndex 的核心目标只有一个向 Leader 询问当前已确认的 committed index。leader 处理线性读请求follower 将pb.MsgReadIndex消息发送给 leaderleader 是如何处理的呢我们继续看func stepLeader(r *raft, m pb.Message) error { switch m.Type { case pb.MsgReadIndex: // 判断是否只有一个投票节点leader 自己如果是的话 leader 直接返回它的 committed index if r.trk.IsSingleton() { if resp : r.responseToReadIndexReq(m, r.raftLog.committed); resp.To ! None { r.send(resp) } return nil } // 检查 leader 是否是新上任的 leader且当前 term 还没有写 index if !r.committedEntryInCurrentTerm() { // 如果是新上任 leader 且没有写 index 则进入 pendingReadIndexMessages r.pendingReadIndexMessages append(r.pendingReadIndexMessages, m) return nil } // 如果当前 leader 的 term 已经有 index 则返回 sendMsgReadIndexResponse(r, m) return nilleader 的处理逻辑分三块判断集群是不是只有一个 leader如果是直接返回如果不是则判断该 leader 在当前 term 下有没有写 index如果没有写则进入raft.pendingReadIndexMessages处理如果当前 term 有写 index则返回这里为什么要判断 leader 在当前 term 有没有写 index 呢实际上对应 Raft 论文 Figure 8 的经典场景。Raft 证明了 旧 Term 的日志即使已经复制到多数节点Leader 也不能直接认为它已经 committed。因为未来仍然可能出现新的 Leader并覆盖这些旧日志。因此Leader 必须先提交一条当前 Term 的日志。一旦当前 Term 的日志被提交根据 Raft 的安全性证明其之前的所有日志也必然已经 committed。这也是 committedEntryInCurrentTerm 存在的原因。committedEntryInCurrentTerm如下func (r *raft) committedEntryInCurrentTerm() bool { // 判断当前 term 有没有提交 index return r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(r.raftLog.committed)) r.Term }如果当前 term 已经发过 index 了则直接处理ReadIndex响应func sendMsgReadIndexResponse(r *raft, m pb.Message) { case ReadOnlySafe: r.readOnly.addRequest(r.raftLog.committed, m) r.readOnly.recvAck(r.id, m.Entries[0].Data) r.bcastHeartbeatWithCtx(m.Entries[0].Data) case ReadOnlyLeaseBased: if resp : r.responseToReadIndexReq(m, r.raftLog.committed); resp.To ! None { r.send(resp) } } }这里响应前需要确保当前 leader 是有效 leader。有两种响应类型一种是走一遍心跳如果当前节点得到多数节点响应则返回 committed index。另一种是根据ReadOnlyLease判断是否是 leader它不需要走一遍心跳。优点是快缺点是它在时间窗口内确定自己是不是 leader如果时间同步有问题则无法确定自己是不是 leader几乎不可能出现这种情况。leader 返回给 follower 的是pb.MsgReadIndexResp消息func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message { ... return pb.Message{ Type: pb.MsgReadIndexResp, To: req.From, Index: readIndex, Entries: req.Entries, } }如果 leader 在当前 term 没有提交过 index则将MsgReadIndex消息加入raft.pendingReadIndexMessagesif !r.committedEntryInCurrentTerm() { r.pendingReadIndexMessages append(r.pendingReadIndexMessages, m) return nil }加入到pendingReadIndexMessagesleader 就返回并未给 follower 返回响应消息那 leader 中是哪里在处理 pendingReadIndexMessages 数组呢由于 leader 中当前 term 还没有提交 indexleader 中的 raft 会提交一个 indexfunc stepLeader(r *raft, m pb.Message) error { switch m.Type { case pb.MsgAppResp: pr.RecentActive true if m.Reject { // 处理拒绝逻辑 ... } else { if pr.MaybeUpdate(m.Index) || (pr.Match m.Index pr.State tracker.StateProbe) { ... if r.maybeCommit() { // 当前 term 已提交 index进入 releasePendingReadIndexMessages releasePendingReadIndexMessages(r) r.bcastAppend() } ... } ... } ... }提交完 index 后进入releasePendingReadIndexMessagesfunc releasePendingReadIndexMessages(r *raft) { // 判断是否有 ReadIndex 请求需要发送 if len(r.pendingReadIndexMessages) 0 { // 如果没有则返回 return } // 判断当前 term 是否已经提交 index理论上是已经提交了 if !r.committedEntryInCurrentTerm() { r.logger.Error(pending MsgReadIndex should be released only after first commit in current term) return } // 清空 pendingReadIndexMessages 数组 msgs : r.pendingReadIndexMessages r.pendingReadIndexMessages nil // 遍历消息将 ReadIndexResp 消息发给 follower 节点 for _, m : range msgs { sendMsgReadIndexResponse(r, m) } }可以看到在 raft 内部提交当前 term 的 index 后会去判断pendingReadIndexMessages中的数组是否有消息如果有则遍历数组返回MsgReadIndexResp的响应给 follower。至此我们知道 leader 是如何处理并响应 follower 的MsgReadIndex请求。那么follower 又是处理 leader 的发来的MsgReadIndexResp响应的呢follower 处理 MsgReadIndexResp 消息leader 发来的响应包括请求 id 和 committed index 的对应关系。follower 在 raft 状态机处理 leader 发来的MsgReadIndexResp响应func stepFollower(r *raft, m pb.Message) error { switch m.Type { ... case pb.MsgReadIndexResp: ... // 将返回的 committed index 和 request id 组成 ReadState 放入 raft.readStates 数组 r.readStates append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data}) } return nil }follower 会将ReadState发给raft.readStates数组。raft 会将数组组成 ready 消息发给上层应用状态机func (r *raftNode) start(rh *raftReadyHandler) { go func() { ... for { select { case rd : -r.Ready(): if len(rd.ReadStates) ! 0 { select { // 将响应发给 raft.readStateC 通道 case r.readStateC - rd.ReadStates[len(rd.ReadStates)-1]: ... } ... } ... } ... }终于经过 follower - leader - follower 这一圈获取到 请求 id 对应的最新的 committed index并且 index 写入到 follower 节点的 raft.readStateC 通道。requestCurrentIndex会监听该通道并处理后面的逻辑就不复杂了篇幅有限就不介绍了。小结etcd 线性读流程示意图如下LeaderFollowerClientLeaderFollowerClientRange RequestMsgReadIndexConfirm LeadershipReadIndexResp(index100)applyWait.Wait(100)wait applyIndex 100MVCC ReadResponseetcd 线性读的核心思想可以概括为三步Follower 通过 ReadIndex 向 Leader 获取当前已确认的 committed index等待本地 applyIndex 追赶到该 committed index