2 minute read

这个实验同样分为两部分,第一部分修改之前的Raft实现,第二部分实现lab3

1. Raft实现优化

在实现了基础的lab 3之后进行测试,发现之前的Raft实现还是有问题,要对其进行修改

1.1. Start之后马上发起Sync

在lab 3实现时发现的第一个问题就是Raft太慢,导致lab 3中的某个测试失败,一开始花了很多时间进行乱七八糟的优化,比如尽量减少rf.persist()的调用,减少rf.log的copy等等,但测了好几次发现并没有什么显著的变化。

后来才发现是因为在Start调用接收到新的log之后没有马上发起另一轮的sync,所以要等到下一次heartbeat才会对新的log进行处理,故而在Start之后马上发起一轮Sync

for peer := range rf.peers {
	if peer == rf.me {
		continue
	}

	go rf.sync(peer)
}

1.2. Snapshot导致log过度收缩问题

加上了1.1之后多次运行会发现另一个问题,在AppendEntries的处理中,args.PrevLogIndex可能小于当前的rf.log[0].Index从而导致计算得出的index为负数,这一情况的原因是各个server自己处理Snapshot请求,所以很有可能的情况是某一个非leader server在处理了Snapshot之后log会压缩,而AppendEntries中的PrevLogIndex会指向已经收缩过的log index。

普遍一点来说,就是AppendEntries发过来的log batch可能有一部分已经压缩,而有一部分没有,这时候我们只要处理没有压缩的那一部分就可以了

for i, entry := range args.Entries {
	currLog := rf.getLogAtIndex(entry.Index)
	if currLog != nil {
		if currLog.Term != entry.Term {
			rf.log = rf.log[:entry.Index-rf.log[0].Index]
			left = i
			break
		}

		left = i + 1
	}
}

for i := left; i < len(args.Entries); i++ {
	entry := args.Entries[i]
	rf.log = append(rf.log, entry)
}

1.3. 统计Raft state size

这一部分lab 3会用到,在Raft state size大于某个值的时候开启压缩

2. Clerk与Server的实现

2.1. Clerk

Clerk的实现相对比较简单,主要有Get和PutAppend两个函数需要实现,他们总体的流程都是先根据参数构造相应的request,接着向Server发送对应的request并且等待回复。只有两个需要注意的地方:

第一,Clerk第一次访问的Server并不一定是leader,而Raft协议中非leader是无法handle请求的,所以如果不是leader的话要能够轮换,不断尝试下一个server,直到遇见leader为止

第二,为了做到idempotent,每一个PutAppend request要带上Clerk id与request id,这样如果网络突然中断,前一个request被Server处理成功,但是没能收到response,Clerk再次重试的时候,Server可以根据Clerk id和request id做去重

一个需要注意的问题是怎样assign Clerk id与request id,要尽量从大范围里选择随机数,防止Clerk id与request id的重复

2.2. Server

要实现Server我们首先要理清Server,也就是state machine与Raft的交互关系,正常的流程如下

  1. Server收到来自Clerk的request
  2. Server将request发给Raft,由Raft保证所有的Server,不论leader和follower都能成功将request commit,使得request不丢失
  3. 在Raft成功将request commit之后,通过applyCh告知Server,由Server处理request,更改state machine
  4. 在成功更改state machine之后由Server将response送回给Clerk

Server端的入口肯定是Get和PutAppend两个RPC handler,那么这两个函数要做的事情也很清晰了,首先要判断自己是不是leader,如果不是的话告知Clerk,之后将request给Raft并且等待Raft commit,那么这里问题就来了,我们怎样等待某个request呢?

考虑到每一个request在调用Raft之后都会得到一个index,我们可以考虑使用这个index作为key,而对应的value为一个接受结果的channel,Get/PutAppend在调用Raft之后就等待这个channel,得到了结果就返回;而另一方面我们使用一个后台goroutine不断监听applyCh,每当收到Raft commit之后,就改变state machine,然后将response放入结果channel。

另一个问题是我们怎么去重呢?一开始的想法是使用request中的Clerk idrequest id构建一个string,由它作为唯一的key,value为上次request的response。但是这样做有两个问题,首先是我们真的需要这两个id一起去重吗?回头思考一下我们做idempotent要防止的情况,是某一个Clerk retry一个已经成功的request也只能造成一次效果,那么我们只要存某个Clerk上一个request即可;第二个垃圾回收的问题也可以解决,因为Clerk的数量还是比较少的。

所以Server需要的额外状态如下

// last applied index
lastApplied int

// in-memory key-value map
storage map[string]string

// channels for inflight request
inflightChans map[int]chan *Reply

// last operation of each client
lastOperations map[int]*Reply

这里的inflightChans就是index与response的mapping,要注意构建inflightChan的时候要使用buffer size为1的buffered chan,否则会造成chan等待

//
// Get associated inflight channel for given index
//
func (kv *KVServer) getInflightChan(index int) chan *Reply {
	if _, ok := kv.inflightChans[index]; !ok {
		// https://stackoverflow.com/questions/23233381/whats-the-difference-between-c-makechan-int-and-c-makechan-int-1
		kv.inflightChans[index] = make(chan *Reply, 1)
	}

	return kv.inflightChans[index]
}

3B关于Snapshot也并不算复杂,每次Raft log数量有变化的时候看一下有没有超过预定的size,如果超过的话就将storagelastOperations做snapshot,由于本身是一个key-value server,所以storage已经自动做了compaction,如果是另外的存储形式的话可能要考虑其他方法,整个background goroutine的写法如下

//
// Background goroutine listening at applyCh to get Raft committed messages
//
func (kv *KVServer) applier() {
	for message := range kv.applyCh {
		if kv.killed() {
			return
		}

		DPrintf("Server %d Raft committed message %v", kv.me, message)

		if message.CommandValid {
			kv.mu.Lock()
			if message.CommandIndex <= kv.lastApplied {
				continue
			}
			kv.lastApplied = message.CommandIndex
			kv.mu.Unlock()

			op := message.Command.(Op)
			reply := Reply{RequestId: op.RequestId}
			if op.OpType != OPTYPE_GET && kv.checkDuplicate(&op) {
				// found duplicate
				DPrintf("Server %d found duplicate request: %s", kv.me, op.ToString())
				reply = *kv.lastOperations[op.ClientId]
			} else if op.OpType == OPTYPE_GET {
				DPrintf("Server %d applying Get: %s", kv.me, op.ToString())
				result, ok := kv.storage[op.Key]
				if ok {
					reply.Value = result
					reply.Err = OK
				} else {
					reply.Err = ErrNoKey
				}
			} else if op.OpType == OPTYPE_PUT {
				DPrintf("Server %d applying Put: %s", kv.me, op.ToString())
				kv.storage[op.Key] = op.Value
				reply.Err = OK

				kv.lastOperations[op.ClientId] = &reply
			} else if op.OpType == OPTYPE_APPEND {
				DPrintf("Server %d applying Append: %s", kv.me, op.ToString())
				result, ok := kv.storage[op.Key]
				if ok {
					kv.storage[op.Key] = result + op.Value
				} else {
					kv.storage[op.Key] = op.Value
				}
				reply.Err = OK

				kv.lastOperations[op.ClientId] = &reply
			}

			_, isLeader := kv.rf.GetState()
			if isLeader {
				DPrintf("Server %d is leader, sending message back to index %d", kv.me, message.CommandIndex)
				kv.mu.Lock()
				ch := kv.getInflightChan(message.CommandIndex)
				kv.mu.Unlock()

				ch <- &reply
			}

			if kv.rf.GetStateSize() >= kv.maxraftstate && kv.maxraftstate != -1 {
				DPrintf("Server %d creating snapshot: kv.rf.GetStateSize(%d) >= kv.maxraftstate(%d)", kv.me, kv.rf.GetStateSize(), kv.maxraftstate)
				w := new(bytes.Buffer)
				e := labgob.NewEncoder(w)
				e.Encode(kv.lastOperations)
				e.Encode(kv.storage)
				kv.rf.Snapshot(message.CommandIndex, w.Bytes())
			}
		} else {
			ok := kv.rf.CondInstallSnapshot(message.SnapshotTerm, message.SnapshotIndex, message.Snapshot)
			DPrintf("Server %d calling CondInstallSnapshot", kv.me)
			if ok {
				kv.lastApplied = message.SnapshotIndex
				kv.readSnapshot(message.Snapshot)
			}
		}
	}
}