MIT 6824 Lab 3总结
这个实验同样分为两部分,第一部分修改之前的Raft实现,第二部分实现lab3
1. Raft实现优化
在实现了基础的lab 3之后进行测试,发现之前的Raft实现还是有问题,要对其进行修改
1.1. Start之后马上发起Sync
在lab 3实现时发现的第一个问题就是Raft太慢,导致lab 3中的某个测试失败,一开始花了很多时间进行乱七八糟的优化,比如尽量减少rf.persist()
的调用,减少rf.log
的copy等等,但测了好几次发现并没有什么显著的变化。
后来才发现是因为在Start调用接收到新的log之后没有马上发起另一轮的sync,所以要等到下一次heartbeat才会对新的log进行处理,故而在Start之后马上发起一轮Sync
for peer := range rf.peers {
if peer == rf.me {
continue
}
go rf.sync(peer)
}
1.2. Snapshot导致log过度收缩问题
加上了1.1之后多次运行会发现另一个问题,在AppendEntries的处理中,args.PrevLogIndex
可能小于当前的rf.log[0].Index
从而导致计算得出的index为负数,这一情况的原因是各个server自己处理Snapshot请求,所以很有可能的情况是某一个非leader server在处理了Snapshot之后log会压缩,而AppendEntries中的PrevLogIndex会指向已经收缩过的log index。
普遍一点来说,就是AppendEntries发过来的log batch可能有一部分已经压缩,而有一部分没有,这时候我们只要处理没有压缩的那一部分就可以了
for i, entry := range args.Entries {
currLog := rf.getLogAtIndex(entry.Index)
if currLog != nil {
if currLog.Term != entry.Term {
rf.log = rf.log[:entry.Index-rf.log[0].Index]
left = i
break
}
left = i + 1
}
}
for i := left; i < len(args.Entries); i++ {
entry := args.Entries[i]
rf.log = append(rf.log, entry)
}
1.3. 统计Raft state size
这一部分lab 3会用到,在Raft state size大于某个值的时候开启压缩
2. Clerk与Server的实现
2.1. Clerk
Clerk的实现相对比较简单,主要有Get和PutAppend两个函数需要实现,他们总体的流程都是先根据参数构造相应的request,接着向Server发送对应的request并且等待回复。只有两个需要注意的地方:
第一,Clerk第一次访问的Server并不一定是leader,而Raft协议中非leader是无法handle请求的,所以如果不是leader的话要能够轮换,不断尝试下一个server,直到遇见leader为止
第二,为了做到idempotent,每一个PutAppend request要带上Clerk id与request id,这样如果网络突然中断,前一个request被Server处理成功,但是没能收到response,Clerk再次重试的时候,Server可以根据Clerk id和request id做去重
一个需要注意的问题是怎样assign Clerk id与request id,要尽量从大范围里选择随机数,防止Clerk id与request id的重复
2.2. Server
要实现Server我们首先要理清Server,也就是state machine与Raft的交互关系,正常的流程如下
- Server收到来自Clerk的request
- Server将request发给Raft,由Raft保证所有的Server,不论leader和follower都能成功将request commit,使得request不丢失
- 在Raft成功将request commit之后,通过
applyCh
告知Server,由Server处理request,更改state machine - 在成功更改state machine之后由Server将response送回给Clerk
Server端的入口肯定是Get和PutAppend两个RPC handler,那么这两个函数要做的事情也很清晰了,首先要判断自己是不是leader,如果不是的话告知Clerk,之后将request给Raft并且等待Raft commit,那么这里问题就来了,我们怎样等待某个request呢?
考虑到每一个request在调用Raft之后都会得到一个index,我们可以考虑使用这个index作为key,而对应的value为一个接受结果的channel,Get/PutAppend在调用Raft之后就等待这个channel,得到了结果就返回;而另一方面我们使用一个后台goroutine不断监听applyCh
,每当收到Raft commit之后,就改变state machine,然后将response放入结果channel。
另一个问题是我们怎么去重呢?一开始的想法是使用request中的Clerk id
和request id
构建一个string,由它作为唯一的key,value为上次request的response。但是这样做有两个问题,首先是我们真的需要这两个id一起去重吗?回头思考一下我们做idempotent要防止的情况,是某一个Clerk retry一个已经成功的request也只能造成一次效果,那么我们只要存某个Clerk上一个request即可;第二个垃圾回收的问题也可以解决,因为Clerk的数量还是比较少的。
所以Server需要的额外状态如下
// last applied index
lastApplied int
// in-memory key-value map
storage map[string]string
// channels for inflight request
inflightChans map[int]chan *Reply
// last operation of each client
lastOperations map[int]*Reply
这里的inflightChans
就是index与response的mapping,要注意构建inflightChan
的时候要使用buffer size为1的buffered chan,否则会造成chan等待
//
// Get associated inflight channel for given index
//
func (kv *KVServer) getInflightChan(index int) chan *Reply {
if _, ok := kv.inflightChans[index]; !ok {
// https://stackoverflow.com/questions/23233381/whats-the-difference-between-c-makechan-int-and-c-makechan-int-1
kv.inflightChans[index] = make(chan *Reply, 1)
}
return kv.inflightChans[index]
}
3B关于Snapshot也并不算复杂,每次Raft log数量有变化的时候看一下有没有超过预定的size,如果超过的话就将storage
和lastOperations
做snapshot,由于本身是一个key-value server,所以storage
已经自动做了compaction,如果是另外的存储形式的话可能要考虑其他方法,整个background goroutine的写法如下
//
// Background goroutine listening at applyCh to get Raft committed messages
//
func (kv *KVServer) applier() {
for message := range kv.applyCh {
if kv.killed() {
return
}
DPrintf("Server %d Raft committed message %v", kv.me, message)
if message.CommandValid {
kv.mu.Lock()
if message.CommandIndex <= kv.lastApplied {
continue
}
kv.lastApplied = message.CommandIndex
kv.mu.Unlock()
op := message.Command.(Op)
reply := Reply{RequestId: op.RequestId}
if op.OpType != OPTYPE_GET && kv.checkDuplicate(&op) {
// found duplicate
DPrintf("Server %d found duplicate request: %s", kv.me, op.ToString())
reply = *kv.lastOperations[op.ClientId]
} else if op.OpType == OPTYPE_GET {
DPrintf("Server %d applying Get: %s", kv.me, op.ToString())
result, ok := kv.storage[op.Key]
if ok {
reply.Value = result
reply.Err = OK
} else {
reply.Err = ErrNoKey
}
} else if op.OpType == OPTYPE_PUT {
DPrintf("Server %d applying Put: %s", kv.me, op.ToString())
kv.storage[op.Key] = op.Value
reply.Err = OK
kv.lastOperations[op.ClientId] = &reply
} else if op.OpType == OPTYPE_APPEND {
DPrintf("Server %d applying Append: %s", kv.me, op.ToString())
result, ok := kv.storage[op.Key]
if ok {
kv.storage[op.Key] = result + op.Value
} else {
kv.storage[op.Key] = op.Value
}
reply.Err = OK
kv.lastOperations[op.ClientId] = &reply
}
_, isLeader := kv.rf.GetState()
if isLeader {
DPrintf("Server %d is leader, sending message back to index %d", kv.me, message.CommandIndex)
kv.mu.Lock()
ch := kv.getInflightChan(message.CommandIndex)
kv.mu.Unlock()
ch <- &reply
}
if kv.rf.GetStateSize() >= kv.maxraftstate && kv.maxraftstate != -1 {
DPrintf("Server %d creating snapshot: kv.rf.GetStateSize(%d) >= kv.maxraftstate(%d)", kv.me, kv.rf.GetStateSize(), kv.maxraftstate)
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(kv.lastOperations)
e.Encode(kv.storage)
kv.rf.Snapshot(message.CommandIndex, w.Bytes())
}
} else {
ok := kv.rf.CondInstallSnapshot(message.SnapshotTerm, message.SnapshotIndex, message.Snapshot)
DPrintf("Server %d calling CondInstallSnapshot", kv.me)
if ok {
kv.lastApplied = message.SnapshotIndex
kv.readSnapshot(message.Snapshot)
}
}
}
}