2 minute read

这是lab 2d的最后一个实验,但是实现却很费力,因为花了很大力气重写之前的部分代码,一不小心就会导致错误,而且非常难调。这篇文章分两部分,首先总结下为2d所做的调整,第二部分再介绍2d的具体实现

1. 实现优化

1.1. 删除checkCommit函数

在之前的实现中,使用了一个checkCommitTimer,每10ms调用一次,它的作用是检测是否有可以被commit的index。但是我们回过头来想想commit index这件事情,它需要所有peer的matchIndex在某个N上达到majority,所以我们只需要在matchIndex有改变的时候来检测是否能进行commit即可

那么在什么时候matchIndex会有变化呢?目前来看有两个,第一是某个peer回复AppendEntries成功;第二个是某个peer installSnapshot成功(具体后面会提到),而对于installSnapshot,可以保证snpshot中的data已经被leader commit过,所以只有在处理AppendEntries response的时候会发生,故而我们在这里进行检测,删除之前每10ms调用一次的timer

baseIndex := rf.log[0].Index
				updatedCommitIndex := rf.commitIndex
				for i := rf.commitIndex + 1; i <= rf.getLastLog().Index; i++ {
					N := i

					replicateCount := 1
					for peer := range rf.peers {
						if peer == rf.me {
							continue
						}

						if rf.matchIndex[peer] >= N {
							replicateCount += 1
						}

						DPrintf("Raft %d comparing index at %d for peer %d: rf.matchIndex[peer]=%d, N=%d, log=%v", rf.me, N, peer, rf.matchIndex[peer], N, rf.log[N-baseIndex])
						if replicateCount*2 > len(rf.peers) && rf.log[N-baseIndex].Term == rf.currentTerm {
							DPrintf("Raft %d received majority at index %d, will set rf.commitIndex to %d", rf.me, N, N)
							updatedCommitIndex = N
							break
						}
					}
				}

				rf.commitIndex = updatedCommitIndex

				if rf.commitIndex > rf.lastApplied {
					rf.cond.Broadcast()
				}

1.2. 将index加入LogEntry中,停止使用len(rf.log)

之前的实现里使用len(rf.log)来得到当前最后的一个LogEntry的index,在2a,2b和2c中这样做没问题,因为log不会缩减,但是在2d中,log的前一部分随时可能会被切掉。因此再使用len(rf.log)会带来很大的问题,比较优雅的解决方案是在LogEntry中加入index,并且使用log[0]来存储baseIndex,实现log index与commitIndex/lastApplied的转换

在Start函数,也就是有新的log要加入时,我们根据当前log最后一个LogEntry的index计算出新log的index

index := rf.getLastLog().Index + 1

newLogEntry := LogEntry{Term: term, Command: command, Index: index}

1.3. 异步applier中进行批处理

之前的实现里,异步applier中有一个for循环,在for循环中每次只会apply一个LogEntry,但其实当conditional variable被触发之后,所有在[rf.lastApplied, rf.commitIndex]之前的LogEntry都是可以被apply的

因此我们修改代码在一个for循环迭代中将这个batch给apply,但要注意的是在apply的过程中(将LogEntry送到对应channel)需要释放锁

for !rf.killed() {
		rf.mu.Lock()

		for rf.commitIndex <= rf.lastApplied {
			rf.cond.Wait()
		}

		commitIndex := rf.commitIndex
		lastApplied := rf.lastApplied

		entries := make([]LogEntry, commitIndex-lastApplied)
		baseIndex := rf.log[0].Index
		DPrintf("Raft %d prepare to apply log, rf.lastApplied=%d, baseIndex=%d, %v", rf.me, rf.lastApplied, baseIndex, rf.log)
		copy(entries, rf.log[lastApplied+1-baseIndex:commitIndex+1-baseIndex])
		rf.mu.Unlock()
		for _, entry := range entries {
			DPrintf("Raft %d will apply log at %d: %v", rf.me, entry.Index, entry)
			rf.applyCh <- ApplyMsg{
				CommandValid: true,
				Command:      entry.Command,
				CommandIndex: entry.Index,
			}
        }
        ...
}

1.4. 修改AppendEntries handler中的bug

之前的实现中有一个bug没有测出来,就是在AppendEntries的实现中会直接把votedFor设置为-1。正确的做法是只有request中的term大于当前term的时候才将votedFor设置为-1,原因是如果在当前term中就将votedFor设置为-1,那么很有可能这个node在同一个term中会对不同的node在RequestVote中投多次票,肯定是错误的

2. Log Compaction

2.1. len(rf.log)

这里有提到一次len(rf.log)是因为它实在太重要而且容易错,要仔细地把几乎所有的len(rf.log)替换成rf.getLastLog().Index,比如在比较log up-to-date时,其余的不再赘述

2.2. Snapshot和CondInstallSnapshot

Snapshot将up-to index的所有log替换成snapshot

func (rf *Raft) Snapshot(index int, snapshot []byte) {
	// Your code here (2D).
	rf.mu.Lock()
	defer rf.mu.Unlock()

	prevSnapshotIndex := rf.log[0].Index
	if index <= prevSnapshotIndex {
		// this snapshot is already included, do nothing
		return
	}

	rf.log = rf.log[index-prevSnapshotIndex:]
	rf.log[0].Command = nil

	DPrintf("Raft %d applied Snapshot up to %d, setting base index to %d, length %d", rf.me, index, rf.log[0].Index, len(rf.log))
	rf.persister.SaveStateAndSnapshot(rf.serializeState(), snapshot)
}

注意这里其实不需要设置rf.log[0].Index为参数index,因为在truncate log的时候这一步已经自动完成

CondInstallSnapshot实现中要注意三点,首先如果当前的commitIndex大于lastIncludedIndex没太大必要install snapshot,因为本地已经包括了这个snapshot的数据;第二点就是如果本地的log落后于snapshot,应该创建一个长度为1的log,使用index 0保存baseIndex;最后要使用lastIncludedTerm, lastIncludedIndex对自身的状态进行更新

func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
	// Your code here (2D).
	rf.mu.Lock()
	defer rf.mu.Unlock()

	if lastIncludedIndex <= rf.commitIndex {
		return false
	}

	if lastIncludedIndex > rf.getLastLog().Index {
		rf.log = make([]LogEntry, 1)
	} else {
		rf.log = rf.log[lastIncludedIndex-rf.log[0].Index:]
	}

	rf.log[0].Term = lastIncludedTerm
	rf.log[0].Index = lastIncludedIndex
	DPrintf("Raft %d setting rf.lastApplied to %d", rf.me, lastIncludedIndex)
	rf.lastApplied = lastIncludedIndex
	rf.commitIndex = lastIncludedIndex

	DPrintf("Raft %d processed CondInstallSnapshot, shrinking log base index to %d, length %d", rf.me, rf.log[0].Index, len(rf.log))
	rf.persister.SaveStateAndSnapshot(rf.serializeState(), snapshot)

	return true
}

Snapshot和CondInstallSnapshot的作用对象不同,Snapshot是state machine调用leader,而CondInstallSnapshot则是state machine调用follower

2.3. InstallSnapshot RPC

InstallSnapshot RPC request/response的定义论文里已经十分清晰,这里不再赘述,要注意的是发送InstallSnapshot的时机。这里我们将其合并到了heartbeat中,在每次发送heartbeat的时候,如果某个peer的nextIndex小于等于baseIndex,那么就不能发送正常的AppendEntries,而是要发送InstallSnapshot

由于将InstallSnapshot与heartbeat合并,所以在InstallSnapshot的handler中要注意reset election timer,它的实现也不算复杂,基本上是处理heartbeat相关的操作之后就将snapshot给state machine执行

func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	DPrintf("Raft %d received InstallSnapshot: %s", rf.me, args.ToString())

	reply.Term = rf.currentTerm
	if args.Term < rf.currentTerm {
		return
	}

	if args.Term > rf.currentTerm {
		rf.currentTerm = args.Term
		rf.votedFor = -1
		rf.state = FOLLOWER
		rf.persist()
	}

	rf.electionTimer.Reset(rf.GetElectionTimeout())

	if args.LastIncludedIndex <= rf.commitIndex {
		return
	}

	go func() {
		rf.applyCh <- ApplyMsg{
			SnapshotValid: true,
			Snapshot:      args.Data,
			SnapshotTerm:  args.LastIncludedTerm,
			SnapshotIndex: args.LastIncludedIndex,
		}
	}()
}

对InstallSnapshot response的处理也要首先检查term,看看是否应当将自身切换成follower状态,其次是更新nextIndex, matchIndex为lastIncludedIndex + 1, lastIncludedIndex