MIT 6.824 Raft Lab 笔记

07 Nov 2020 by fleuria

总算做了一把 MIT6.824 的 Raft Lab,还有点小问题未明,好像在完整跑 2C Test Suite 有两个 case 仍不大稳定,先不管了记一下先。

事件驱动

在 raft-structure 文档中提到有 channel 传递消息和 locking 两种代码组织的方式,助教更建议使用 shared structure + lock 的方式,因为「Experience suggests that for Raft it is most straightforward to use shared data and locks」。不过个人仍比较倾向于使用消息驱动的方式来组织代码,主要的好处是好测试,尤其是时钟,可以通过时钟消息来驱动测试,相比较之下 timer 就不那么容易 mock 了。

事件驱动的思路是,一切请求都从一个 chan 进入分派,主循环对这个 chan 的事件消息做分派处理,输入方等待事件完成得到响应结果。

在 Lab2 代码的脚手架中,我们的请求入口是 RequestVote 和 AppendEntries 两个 RPC Handler:

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here (2A, 2B).
	ev := newRaftEV(args)
	select {
	case <-rf.quitc:
		return
	case rf.eventc <- ev:
	}

	<-ev.c
	if ev.result == nil {
		panic("unexpected nil result")
	}
	*reply = *(ev.result.(*RequestVoteReply))
}

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	ev := newRaftEV(args)
	select {
	case <-rf.quitc:
		return
	case rf.eventc <- ev:
	}
	<-ev.c
	if ev.result == nil {
		panic("unexpected nil result")
	}
	*reply = *(ev.result.(*AppendEntriesReply))
}

每当收到输入,则 newRaftEV(args) 将请求体包进 raftEV 结构体 ev,传入 rf.eventc,随后等待 ev.c 通知,从 ev.result 中拿到响应结果后返回。raftEV 的定义很简单:

type raftEV struct {
	args   interface{}
	c      chan struct{}
	result interface{} // maybe nil
}

func newRaftEV(args interface{}) *raftEV {
	return &raftEV{
		c:      make(chan struct{}, 1),
		args:   args,
		result: nil,
	}
}

func (ev *raftEV) Done(result interface{}) {
	ev.result = result
	close(ev.c)
}

这里有个小 trick 是只使用 chan 的 close 起到通知的作用,并没有直接使用 ev.c 来返回响应值,而是将返回值放在 ev.result 中。

Raft Loop

在事件驱动的代码结构下,raft 的主循环就是一次 ev 的分派,根据不同的 raft 角色进行不同的处理逻辑:

func (rf *Raft) loopEV() {
	defer rf.routineGroup.Done()

	for {
		select {
		case ev := <-rf.eventc:
			switch rf.state {
			case RaftLeader:
				rf.stepLeader(ev)

			case RaftCandidate:
				rf.stepCandidate(ev)

			case RaftFollower:
				rf.stepFollower(ev)
			}

		case <-rf.quitc:
			DPrintf("%v loopEV.quited", rf.logPrefix())
			return
		}
	}
}

先看 stepLeader 为例:

func (rf *Raft) stepLeader(ev *raftEV) {
	switch v := ev.args.(type) {
	case *TickEventArgs:
		rf.heartbeatTimeoutTicks--
		if rf.heartbeatTimeoutTicks <= 0 {
			rf.heartbeatTimeoutTicks = defaultHeartBeatTimeoutTicks
			rf.broadcastAppendEntries()
		}
		ev.Done(nil)

	case *AppendEntriesReply:
		rf.processAppendEntriesReply(v)
		ev.Done(nil)

	case *AppendEntriesArgs:
		reply := rf.processAppendEntries(v)
		ev.Done(reply)

	case *RequestVoteArgs:
		reply := rf.processRequestVote(v)
		ev.Done(reply)

	case *DispatchCommandArgs:
		reply := rf.processDispatchCommand(v)
		ev.Done(reply)

	default:
		DPrintf("%v step-leader.unexpected-ev %#v", rf.raftInfo(), v)
		ev.Done(nil)
	}
}

来自 AppendEntries / RequestVote 请求的 ev 需要保证返回 reply,其他内部的事件消息如时钟事件,只要返回 nil 即可。此外,发送 AppendEntries 请求收到的响应,也会被视为事件,允许做异步处理。

时钟

在 stepLeader 的代码中可以见到里面有一个 TickEventArgs 的事件类型,以及一个 rf.heartbeatTimeoutTicks 字段来维护触发下次超时的 Tick 数,一次 Tick 等同于固定大小的时间片。

这里在自己写测试时候好处比较大,直接向 eventc 分发 Ticks 事件,对 Raft 实例而言就等同于时间向前走了。不需要在 Test Case 中按真实时间做 Sleep 等待。

生成 Ticks 的逻辑也简单:

func (rf *Raft) loopTicks() {
	defer rf.routineGroup.Done()

	for {
		time.Sleep(defaultTickIntervalMs * time.Millisecond)
		select {
		case <-rf.quitc:
			return
		case rf.eventc <- newRaftEV(&TickEventArgs{}):
		}
	}
}

异步请求

在事件驱动的流程中要注意避免有阻塞主循环的操作出现,脚手架中 rf.sendAppendEntries RPC 请求是一个阻塞的方法,可以简单开一个 goroutine 实现异步化,并且将响应结果也作为异步事件进行分派:

func (rf *Raft) broadcastAppendEntries() {
	for peerID, nextIndex := range rf.nextIndex {
		if peerID == rf.me {
			continue
		}

		args := rf.prepareAppendEntriesArgs(nextIndex)

		go func(peerID int) {
			reply := AppendEntriesReply{}
			ok := rf.sendAppendEntries(peerID, args, &reply)
			// DPrintf("%v heartbeat [%v] reply=%#v", rf.raftInfo(), peerID, reply)

			emptyReply := AppendEntriesReply{}
			if ok && reply != emptyReply {
				select {
				case <-rf.quitc:
				case rf.eventc <- newRaftEV(&reply):
				}
			}
		}(peerID)
	}
}

Test Case 中会模拟请求失败、超时乃至丢失的情况。响应超时/丢失对于 AppendEntries 和 RequestVote 的发送方都问题不大,前者会到 heartbeatTimeout 时继续基于 nextIndex 做重试,后者会到 electionTimeout 时重新发起选举。

会存在这样的 Case 就是一个响应超时了很久,乃至新 Leader 都当选了才收到这个响应,这里仍没有关系,响应的处理函数中也会检查响应体中的 Term 起到防御的作用。

Raft 结构定义

出于事件驱动的需要,RPC 请求的响应结果也有保存在 Raft 结构体中的必要,如 voteGranted、nextIndex、matchIndex 几个字段,回过头来看 Raft 结构体的定义:

//
// A Go object implementing a single Raft peer.
//
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.
	state        RaftState
	eventc       chan *raftEV
	quitc        chan struct{}
	quitOnce     sync.Once
	applyCh      chan ApplyMsg
	routineGroup sync.WaitGroup

	logEntries []raftLogEntry

	// volatile state on leaders
	nextIndex  []int
	matchIndex []int

	// volatile state on all servers
	commitIndex int
	applyIndex  int

	// volatile state on candidates
	voteGranted map[int]bool

	// persistent state on all servers
	term     int
	votedFor int

	// clock in ticks
	heartbeatTimeoutTicks uint
	electionTimeoutTicks  uint
}

遇到的问题

在跑完整的 2A / 2B / 2C 的 Test Suite 之前,可以从 test_test.go 中找出来单独的测试方法单独跑,比如:

go test -run TestBasicAgree2B

常见的报错有 apply error 和 failed to reach agreement 两种。

apply error

指日志存在不一致现象,数据乱掉了。这类错误还相对容易解决,看 Raft 论文的各种 if else 条件查查哪里不对一般就能解决。

再就是 persist() 的位置,在任何一个修改到 term、votedFor 和 logEntries 时都必须要持久化。一开始有漏掉修改 logEntries 时的持久化而遇到了这个报错。

failed to reach agreement

相对软性一点,意思是有 Split-vote,两个合法的 Candidate 长时间没有选举出新 Leader。

不得不说 electionTimeout 的重置一定要各种小心。在 TestPersist22C 中遇到一个这个错误,原因之前每次转为 Follower 就重置 ElectionTimer,实际上应当只有在 Grant a Vote 时重制 timer,收到更高 Term 的消息时会转为 Follower 但不重置 Timer。

Students’ Guide 中强调了好几次 Election Timer 的重要性,千千万万不要乱设置:

Make sure you reset your election timer exactly when Figure 2 says you should.

However, if you read Figure 2 carefully, it says:If election timeout elapses without receiving AppendEntries RPC from current leader or granting vote to candidate: convert to candidate.

Raft 定义中重置 ElectionTimer 的位置是海量专精细算过的,千万不要随意设置

千万不要一收到 appendEntries 就重置 electionTimout,先检查这个请求对不对劲,确认 term logIndex 啥的没问题再重置!

千万不要一转成 Candidate 时就重置 electionTimeout,只有当 Grant 这个 Vote 时才重置,不然选不出来!

此外还遇到一个 Case 是一开始没有处理好 goroutine 的退出,在单个 Test Case 都过了,跑完整的 Test Suite 会遇到这个报错,大概是泄露的 gorotine 影响了执行的 timing。代码注释里有留这段提示:

// the issue is that long-running goroutines use memory and may chew
// up CPU time, perhaps causing later tests to fail and generating
// confusing debug output. any goroutine with a long-running loop
// should call killed() to check whether it should stop./

调试技巧

真的有很多细节是撞到才懂。Students’ Guide 跟原论文不经意一句话,其实是很重要的实现要点。遇到问题时,搞各种 debug 往往不如去找 Students’ Guide 查哪里违反了 Raft 的不变式,有事半功倍的功效

打日志是调试中查看 Raft 集群状态的重要手段。在打日志时,总是在日志的开头将 raft 节点的状态信息按统一的格式打出来会直观一些,如 applyIdx、term 等信息,在大多数状态迁移中都是有用的:

func (rf *Raft) raftInfo() string {
	lastIndex, _ := rf.lastLogInfo()
	return fmt.Sprintf("[%d %v term:%d applyIdx:%d commitIdx:%d electionTicks:%d lastIndex:%#v]", rf.me, rf.state.String(), rf.term, rf.applyIndex, rf.commitIndex, rf.electionTimeoutTicks, lastIndex)
}

一般上班时候会比较侧重于在客户端侧打日志,记录请求失败等信息,然而 Raft 的测试集中请求失败过于常态,记录客户端请求失败的日志对调试干扰很大,不如只打服务端接收到的请求日志,失败的请求当没有存在过,多长时间没有收到请求超时了进入下一操作,对问题的排查基本上没有损失。

References