MIT 6824 Lab 2D总结
这是lab 2d的最后一个实验,但是实现却很费力,因为花了很大力气重写之前的部分代码,一不小心就会导致错误,而且非常难调。这篇文章分两部分,首先总结下为2d所做的调整,第二部分再介绍2d的具体实现
1. 实现优化
1.1. 删除checkCommit函数
在之前的实现中,使用了一个checkCommitTimer,每10ms调用一次,它的作用是检测是否有可以被commit的index。但是我们回过头来想想commit index这件事情,它需要所有peer的matchIndex在某个N上达到majority,所以我们只需要在matchIndex有改变的时候来检测是否能进行commit即可
那么在什么时候matchIndex会有变化呢?目前来看有两个,第一是某个peer回复AppendEntries成功;第二个是某个peer installSnapshot成功(具体后面会提到),而对于installSnapshot,可以保证snpshot中的data已经被leader commit过,所以只有在处理AppendEntries response的时候会发生,故而我们在这里进行检测,删除之前每10ms调用一次的timer
baseIndex := rf.log[0].Index
updatedCommitIndex := rf.commitIndex
for i := rf.commitIndex + 1; i <= rf.getLastLog().Index; i++ {
N := i
replicateCount := 1
for peer := range rf.peers {
if peer == rf.me {
continue
}
if rf.matchIndex[peer] >= N {
replicateCount += 1
}
DPrintf("Raft %d comparing index at %d for peer %d: rf.matchIndex[peer]=%d, N=%d, log=%v", rf.me, N, peer, rf.matchIndex[peer], N, rf.log[N-baseIndex])
if replicateCount*2 > len(rf.peers) && rf.log[N-baseIndex].Term == rf.currentTerm {
DPrintf("Raft %d received majority at index %d, will set rf.commitIndex to %d", rf.me, N, N)
updatedCommitIndex = N
break
}
}
}
rf.commitIndex = updatedCommitIndex
if rf.commitIndex > rf.lastApplied {
rf.cond.Broadcast()
}
1.2. 将index加入LogEntry中,停止使用len(rf.log)
之前的实现里使用len(rf.log)来得到当前最后的一个LogEntry的index,在2a,2b和2c中这样做没问题,因为log不会缩减,但是在2d中,log的前一部分随时可能会被切掉。因此再使用len(rf.log)会带来很大的问题,比较优雅的解决方案是在LogEntry中加入index,并且使用log[0]来存储baseIndex,实现log index与commitIndex/lastApplied的转换
在Start函数,也就是有新的log要加入时,我们根据当前log最后一个LogEntry的index计算出新log的index
index := rf.getLastLog().Index + 1
newLogEntry := LogEntry{Term: term, Command: command, Index: index}
1.3. 异步applier中进行批处理
之前的实现里,异步applier中有一个for循环,在for循环中每次只会apply一个LogEntry,但其实当conditional variable被触发之后,所有在[rf.lastApplied, rf.commitIndex]
之前的LogEntry都是可以被apply的
因此我们修改代码在一个for循环迭代中将这个batch给apply,但要注意的是在apply的过程中(将LogEntry送到对应channel)需要释放锁
for !rf.killed() {
rf.mu.Lock()
for rf.commitIndex <= rf.lastApplied {
rf.cond.Wait()
}
commitIndex := rf.commitIndex
lastApplied := rf.lastApplied
entries := make([]LogEntry, commitIndex-lastApplied)
baseIndex := rf.log[0].Index
DPrintf("Raft %d prepare to apply log, rf.lastApplied=%d, baseIndex=%d, %v", rf.me, rf.lastApplied, baseIndex, rf.log)
copy(entries, rf.log[lastApplied+1-baseIndex:commitIndex+1-baseIndex])
rf.mu.Unlock()
for _, entry := range entries {
DPrintf("Raft %d will apply log at %d: %v", rf.me, entry.Index, entry)
rf.applyCh <- ApplyMsg{
CommandValid: true,
Command: entry.Command,
CommandIndex: entry.Index,
}
}
...
}
1.4. 修改AppendEntries handler中的bug
之前的实现中有一个bug没有测出来,就是在AppendEntries的实现中会直接把votedFor设置为-1。正确的做法是只有request中的term大于当前term的时候才将votedFor设置为-1,原因是如果在当前term中就将votedFor设置为-1,那么很有可能这个node在同一个term中会对不同的node在RequestVote中投多次票,肯定是错误的
2. Log Compaction
2.1. len(rf.log)
这里有提到一次len(rf.log)是因为它实在太重要而且容易错,要仔细地把几乎所有的len(rf.log)替换成rf.getLastLog().Index,比如在比较log up-to-date时,其余的不再赘述
2.2. Snapshot和CondInstallSnapshot
Snapshot将up-to index的所有log替换成snapshot
func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (2D).
rf.mu.Lock()
defer rf.mu.Unlock()
prevSnapshotIndex := rf.log[0].Index
if index <= prevSnapshotIndex {
// this snapshot is already included, do nothing
return
}
rf.log = rf.log[index-prevSnapshotIndex:]
rf.log[0].Command = nil
DPrintf("Raft %d applied Snapshot up to %d, setting base index to %d, length %d", rf.me, index, rf.log[0].Index, len(rf.log))
rf.persister.SaveStateAndSnapshot(rf.serializeState(), snapshot)
}
注意这里其实不需要设置rf.log[0].Index为参数index,因为在truncate log的时候这一步已经自动完成
CondInstallSnapshot实现中要注意三点,首先如果当前的commitIndex大于lastIncludedIndex没太大必要install snapshot,因为本地已经包括了这个snapshot的数据;第二点就是如果本地的log落后于snapshot,应该创建一个长度为1的log,使用index 0保存baseIndex;最后要使用lastIncludedTerm, lastIncludedIndex对自身的状态进行更新
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
// Your code here (2D).
rf.mu.Lock()
defer rf.mu.Unlock()
if lastIncludedIndex <= rf.commitIndex {
return false
}
if lastIncludedIndex > rf.getLastLog().Index {
rf.log = make([]LogEntry, 1)
} else {
rf.log = rf.log[lastIncludedIndex-rf.log[0].Index:]
}
rf.log[0].Term = lastIncludedTerm
rf.log[0].Index = lastIncludedIndex
DPrintf("Raft %d setting rf.lastApplied to %d", rf.me, lastIncludedIndex)
rf.lastApplied = lastIncludedIndex
rf.commitIndex = lastIncludedIndex
DPrintf("Raft %d processed CondInstallSnapshot, shrinking log base index to %d, length %d", rf.me, rf.log[0].Index, len(rf.log))
rf.persister.SaveStateAndSnapshot(rf.serializeState(), snapshot)
return true
}
Snapshot和CondInstallSnapshot的作用对象不同,Snapshot是state machine调用leader,而CondInstallSnapshot则是state machine调用follower
2.3. InstallSnapshot RPC
InstallSnapshot RPC request/response的定义论文里已经十分清晰,这里不再赘述,要注意的是发送InstallSnapshot的时机。这里我们将其合并到了heartbeat中,在每次发送heartbeat的时候,如果某个peer的nextIndex小于等于baseIndex,那么就不能发送正常的AppendEntries,而是要发送InstallSnapshot
由于将InstallSnapshot与heartbeat合并,所以在InstallSnapshot的handler中要注意reset election timer,它的实现也不算复杂,基本上是处理heartbeat相关的操作之后就将snapshot给state machine执行
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
DPrintf("Raft %d received InstallSnapshot: %s", rf.me, args.ToString())
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm {
return
}
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.state = FOLLOWER
rf.persist()
}
rf.electionTimer.Reset(rf.GetElectionTimeout())
if args.LastIncludedIndex <= rf.commitIndex {
return
}
go func() {
rf.applyCh <- ApplyMsg{
SnapshotValid: true,
Snapshot: args.Data,
SnapshotTerm: args.LastIncludedTerm,
SnapshotIndex: args.LastIncludedIndex,
}
}()
}
对InstallSnapshot response的处理也要首先检查term,看看是否应当将自身切换成follower状态,其次是更新nextIndex, matchIndex为lastIncludedIndex + 1, lastIncludedIndex