#Read Index Read index读的流程这里先简单说一下,当leader接收到一个读取请求时:
将当前日志的commit index记录到一个本地变量readIndex中,封装到消息体中
首先需要确认自己是否仍然是leader,因此需要向其他节点都发起一次心跳
如果收到了大多数节点的心跳响应,那么说明该server仍然是leader身份
在状态机执行的地方,判断下apply index是否超过了readIndex,如果超过了,那么就表明发起该读取请求时,所有之前的日志都已经处理完成,也就是说能够满足线性一致性读的要求
从状态机去读取结果,返回给客户端 我们可以看到,和刚刚的方法相比,read index采用心跳的方式首先确认自己仍然是leader,然后等待状态机执行到了发起读取时所有日志,就可以安全的处理客户端请求了,这里虽然还有一次心跳的网络开销,但一方面心跳包本身非常小,另外处理心跳的逻辑非常简单,比如不需要日志落盘等,因此性能相对之前的方法会高非常多。
使用了Read Index的方法,我们还可以提供follower节点读取的功能,并可以在follower上实现线性一致性读,逻辑和leader有些差异:
Follower向leader查询最新的readIndex
leader会按照上面说的,走一遍流程,但会在确认了自己leader的身份之后,直接将readIndex返回给follower
Follower等待自己的状态机执行到了readIndex的位置之后,就可以安全的处理客户端的读请求
接下来我们看看sofa-jraft是如何实现Read Index的
// 请求 ID 作为请求上下文传入
final byte[] reqContext = new byte[4];
Bits.putInt(reqContext, 0, requestId.incrementAndGet());
// 调用 readIndex 方法,等待回调执行
this.node.readIndex(reqContext, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
if (status.isOk()) {
//处理用户读请求
} else {
// 特定情况下,比如发生选举,该读请求将失败
asyncContext.sendResponse(new BooleanCommand(false, status.getErrorMsg()));
}
}
});
通过Node#readIndex(byte [] requestContext, ReadIndexClosure done)发起一次线性一致性读请求:
private ReadOnlyService readOnlyService;
@Override
public void readIndex(final byte[] requestContext, final ReadIndexClosure done) {
if (this.shutdownLatch != null) {
Utils.runClosureInThread(done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
throw new IllegalStateException("Node is shutting down");
}
Requires.requireNonNull(done, "Null closure");
//异步执行,添加到ReadIndex队列中
this.readOnlyService.addRequest(requestContext, done);
}
public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndexListener {
/** Disruptor to run readonly service. */
private Disruptor<ReadIndexEvent> readIndexDisruptor;
private RingBuffer<ReadIndexEvent> readIndexQueue;
@Override
public boolean init(final ReadOnlyServiceOptions opts) {
this.node = opts.getNode();
this.nodeMetrics = this.node.getNodeMetrics();
this.fsmCaller = opts.getFsmCaller();
this.raftOptions = opts.getRaftOptions();
this.scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true));
this.readIndexDisruptor = DisruptorBuilder.<ReadIndexEvent> newInstance() //
.setEventFactory(new ReadIndexEventFactory()) //
.setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
.setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true)) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.setProducerType(ProducerType.MULTI) //
.build();
//消费者
this.readIndexDisruptor.handleEventsWith(new ReadIndexEventHandler());
this.readIndexDisruptor
.setDefaultExceptionHandler(new LogExceptionHandler<Object>(this.getClass().getSimpleName()));
this.readIndexQueue = this.readIndexDisruptor.start();
if(this.nodeMetrics.getMetricRegistry() != null) {
this.nodeMetrics.getMetricRegistry().register("jraft-read-only-service-disruptor", new DisruptorMetricSet(this.readIndexQueue));
}
// listen on lastAppliedLogIndex change events.
this.fsmCaller.addLastAppliedLogIndexListener(this);
// start scanner
this.scheduledExecutorService.scheduleAtFixedRate(() -> onApplied(this.fsmCaller.getLastAppliedIndex()),
this.raftOptions.getMaxElectionDelayMs(), this.raftOptions.getMaxElectionDelayMs(), TimeUnit.MILLISECONDS);
return true;
}
@Override
public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
if (this.shutdownLatch != null) {
//如果节点已关闭,直接返回失败
Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Was stopped"));
throw new IllegalStateException("Service already shutdown.");
}
try {
EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
event.done = closure;//回调
event.requestContext = new Bytes(reqCtx); //请求上下文
event.startTime = Utils.monotonicMs();//记录当前时间戳
};
int retryTimes = 0;
while (true) {
//放到队列中
if (this.readIndexQueue.tryPublishEvent(translator)) {
break;
} else {
//如果失败了则重试,最大3次
retryTimes++;
if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) {
Utils.runClosureInThread(closure,
new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests."));
this.nodeMetrics.recordTimes("read-index-overload-times", 1);
LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
return;
}
//休息一会,避免占用CPU
ThreadHelper.onSpinWait();
}
}
} catch (final Exception e) {
Utils.runClosureInThread(closure, new Status(RaftError.EPERM, "Node is down."));
}
}
}
典型的生产者、消费者模型,底层队列采用disruptor的RingBuffer, 这里主要是会合并ReadIndex请求,这里提下性能优化非常常用的一个手段:batch合并:
private class ReadIndexEventHandler implements EventHandler<ReadIndexEvent> {
// task list for batch
private final List<ReadIndexEvent> events = new ArrayList<>(
ReadOnlyServiceImpl.this.raftOptions.getApplyBatch());
@Override
public void onEvent(final ReadIndexEvent newEvent, final long sequence, final boolean endOfBatch)
throws Exception {
if (newEvent.shutdownLatch != null) {
executeReadIndexEvents(this.events);
this.events.clear();
newEvent.shutdownLatch.countDown();
return;
}
this.events.add(newEvent);
//合并ReadIndex请求,默认32个
if (this.events.size() >= ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
executeReadIndexEvents(this.events);
this.events.clear();
}
}
}
private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
if (events.isEmpty()) {
return;
}
//构造消息体
final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder() //
.setGroupId(this.node.getGroupId()) //
.setServerId(this.node.getServerId().toString());
final List<ReadIndexState> states = new ArrayList<>(events.size());
for (final ReadIndexEvent event : events) {
rb.addEntries(ZeroByteStringHelper.wrap(event.requestContext.get()));
states.add(new ReadIndexState(event.requestContext, event.done, event.startTime));
}
final ReadIndexRequest request = rb.build();
this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
}
可以看到,实际处理ReadIndex请求的是Node#handleReadIndexRequest, 这里注意,会在上层进行合并,另外这里传入了一个ReadIndexResponseClosure回调,这个回调会在节点确认了自己leader的身份之后执行
/**
* Handle read index request.
*/
@Override
public void handleReadIndexRequest(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> done) {
final long startMs = Utils.monotonicMs();
this.readLock.lock();
try {
switch (this.state) {
//如果是leader的话,直接处理即可
case STATE_LEADER:
readLeader(request, ReadIndexResponse.newBuilder(), done);
break;
case STATE_FOLLOWER:
//如果该节点是follower的话,需要向leader查询readIndex
readFollower(request, done);
break;
case STATE_TRANSFERRING:
//如果leader正在迁移,则直接返回
done.run(new Status(RaftError.EBUSY, "Is transferring leadership."));
break;
default:
done.run(new Status(RaftError.EPERM, "Invalid state for readIndex: %s.", this.state));
break;
}
} finally {
this.readLock.unlock();
this.metrics.recordLatency("handle-read-index", Utils.monotonicMs() - startMs);
this.metrics.recordSize("handle-read-index-entries", request.getEntriesCount());
}
}
//如果是follower的话,需要向leader查询readIndex
private void readFollower(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> closure) {
if (this.leaderId == null || this.leaderId.isEmpty()) {
closure.run(new Status(RaftError.EPERM, "No leader at term %d.", this.currTerm));
return;
}
// send request to leader.
final ReadIndexRequest newRequest = ReadIndexRequest.newBuilder() //
.mergeFrom(request) //
.setPeerId(this.leaderId.toString()) //
.build();
this.rpcService.readIndex(this.leaderId.getEndpoint(), newRequest, -1, closure);
}
private void readLeader(final ReadIndexRequest request, final ReadIndexResponse.Builder respBuilder,
final RpcResponseClosure<ReadIndexResponse> closure) {
final int quorum = getQuorum();
if (quorum <= 1) {
// Only one peer, fast path.
respBuilder.setSuccess(true) //
.setIndex(this.ballotBox.getLastCommittedIndex());
closure.setResponse(respBuilder.build());
closure.run(Status.OK());
return;
}
//记录当前日志的commit index,即readIndex
final long lastCommittedIndex = this.ballotBox.getLastCommittedIndex();
//leader刚启动时,需要先提交一条日志,确认自己的leader身份
if (this.logManager.getTerm(lastCommittedIndex) != this.currTerm) {
// Reject read only request when this leader has not committed any log entry at its term
closure
.run(new Status(
RaftError.EAGAIN,
"ReadIndex request rejected because leader has not committed any log entry at its term, logIndex=%d, currTerm=%d.",
lastCommittedIndex, this.currTerm));
return;
}
respBuilder.setIndex(lastCommittedIndex);
//如果该请求来自follower,需要确认下该follower是否仍然在该raft集群中
if (request.getPeerId() != null) {
// request from follower, check if the follower is in current conf.
final PeerId peer = new PeerId();
peer.parse(request.getServerId());
if (!this.conf.contains(peer)) {
closure
.run(new Status(RaftError.EPERM, "Peer %s is not in current configuration: {}.", peer, this.conf));
return;
}
}
ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && !isLeaderLeaseValid()) {
// If leader lease timeout, we must change option to ReadOnlySafe
readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
}
switch (readOnlyOpt) {
case ReadOnlySafe:
final List<PeerId> peers = this.conf.getConf().getPeers();
Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty peers");
final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(closure,
respBuilder, quorum, peers.size());
// Send heartbeat requests to followers
for (final PeerId peer : peers) {
if (peer.equals(this.serverId)) {
continue;
}
//向所有其他的节点发送心跳包
this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
}
break;
case ReadOnlyLeaseBased:
// Responses to followers and local node.
respBuilder.setSuccess(true);
closure.setResponse(respBuilder.build());
closure.run(Status.OK());
break;
}
}
private class ReadIndexHeartbeatResponseClosure extends RpcResponseClosureAdapter<AppendEntriesResponse> {
final ReadIndexResponse.Builder respBuilder;
final RpcResponseClosure<ReadIndexResponse> closure;
final int quorum;
final int failPeersThreshold;
int ackSuccess;
int ackFailures;
boolean isDone;
public ReadIndexHeartbeatResponseClosure(final RpcResponseClosure<ReadIndexResponse> closure,
final ReadIndexResponse.Builder rb, final int quorum,
final int peersCount) {
super();
this.closure = closure;
this.respBuilder = rb;
this.quorum = quorum;
this.failPeersThreshold = peersCount % 2 == 0 ? (quorum - 1) : quorum;
this.ackSuccess = 0;
this.ackFailures = 0;
this.isDone = false;
}
@Override
public synchronized void run(final Status status) {
if (this.isDone) {
return;
}
if (status.isOk() && getResponse().getSuccess()) {
this.ackSuccess++;
} else {
this.ackFailures++;
}
// 如果收到了大多数节点的心跳包,那么说明该节点仍然是leader
// 执行回调
if (this.ackSuccess + 1 >= this.quorum) {
this.respBuilder.setSuccess(true);
this.closure.setResponse(this.respBuilder.build());
this.closure.run(Status.OK());
this.isDone = true;
} else if (this.ackFailures >= this.failPeersThreshold) {
this.respBuilder.setSuccess(false);
this.closure.setResponse(this.respBuilder.build());
this.closure.run(Status.OK());
this.isDone = true;
}
}
}
在确认了leader的身份之后,需要等待状态机执行到readIndex:
class ReadIndexResponseClosure extends RpcResponseClosureAdapter<ReadIndexResponse> {
final List<ReadIndexState> states;
final ReadIndexRequest request;
public ReadIndexResponseClosure(final List<ReadIndexState> states, final ReadIndexRequest request) {
super();
this.states = states;
this.request = request;
}
/**
* Called when ReadIndex response returns.
*/
@Override
public void run(final Status status) {
if (!status.isOk()) {
notifyFail(status);
return;
}
final ReadIndexResponse readIndexResponse = getResponse();
if (!readIndexResponse.getSuccess()) {
notifyFail(new Status(-1, "Fail to run ReadIndex task, maybe the leader stepped down."));
return;
}
// Success
final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request,
readIndexResponse.getIndex());
for (final ReadIndexState state : this.states) {
// Records current commit log index.
state.setIndex(readIndexResponse.getIndex());
}
boolean doUnlock = true;
ReadOnlyServiceImpl.this.lock.lock();
try {
//如果状态机已经执行到了readIndex,那么说明可以处理用户的请求了
if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) {
// Already applied,notify readIndex request.
ReadOnlyServiceImpl.this.lock.unlock();
doUnlock = false;
notifySuccess(readIndexStatus);
} else {
// 状态机还没有执行到readIndex,需要放到pending队列
ReadOnlyServiceImpl.this.pendingNotifyStatus
.computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)).add(readIndexStatus);
}
} finally {
if (doUnlock) {
ReadOnlyServiceImpl.this.lock.unlock();
}
}
}
private void notifyFail(final Status status) {
final long nowMs = Utils.monotonicMs();
for (final ReadIndexState state : this.states) {
ReadOnlyServiceImpl.this.nodeMetrics.recordLatency("read-index", nowMs - state.getStartTimeMs());
final ReadIndexClosure done = state.getDone();
if (done != null) {
final Bytes reqCtx = state.getRequestContext();
done.run(status, ReadIndexClosure.INVALID_LOG_INDEX, reqCtx != null ? reqCtx.get() : null);
}
}
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。