java 实现简易基于Dledger 的选举
1. 定义 Dledger 节点类,包含节点的状态、日志存储、选举和日志复制逻辑
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
class DledgerNode {private static final int ELECTION_TIMEOUT_MIN = 150;private static final int ELECTION_TIMEOUT_MAX = 300;private static final int HEARTBEAT_INTERVAL = 50;private long currentTerm;private int votedFor;private List<LogEntry> log;private int state;private int selfId;private List<Integer> peers;private ScheduledExecutorService scheduler;private long electionTimeout;private int votesReceived;public DledgerNode(int selfId, List<Integer> peers) {this.currentTerm = 0;this.votedFor = -1;this.log = new ArrayList<>();this.state = 0;this.selfId = selfId;this.peers = peers;this.scheduler = Executors.newScheduledThreadPool(1);resetElectionTimeout();}private void resetElectionTimeout() {Random random = new Random();this.electionTimeout = random.nextInt(ELECTION_TIMEOUT_MAX - ELECTION_TIMEOUT_MIN + 1) + ELECTION_TIMEOUT_MIN;scheduler.schedule(this::startElection, electionTimeout, TimeUnit.MILLISECONDS);}private void startElection() {state = 1;currentTerm++;votedFor = selfId;votesReceived = 1;VoteRequest request = new VoteRequest(currentTerm, selfId, getLastLogIndex(), getLastLogTerm());for (int peer : peers) {sendVoteRequest(peer, request);}}private void sendVoteRequest(int peer, VoteRequest request) {VoteResponse response = handleVoteRequest(peer, request);handleVoteResponse(response);}private VoteResponse handleVoteRequest(int peer, VoteRequest request) {if (request.term < currentTerm) {return new VoteResponse(currentTerm, false);}if (request.term > currentTerm) {currentTerm = request.term;state = 0;votedFor = -1;}if ((votedFor == -1 || votedFor == request.candidateId) &&(request.lastLogTerm > getLastLogTerm() ||(request.lastLogTerm == getLastLogTerm() && request.lastLogIndex >= getLastLogIndex()))) {votedFor = request.candidateId;resetElectionTimeout();return new VoteResponse(currentTerm, true);}return new VoteResponse(currentTerm, false);}private void handleVoteResponse(VoteResponse response) {if (response.term > currentTerm) {currentTerm = response.term;state = 0;votedFor = -1;resetElectionTimeout();}if (state == 1 && response.voteGranted) {votesReceived++;if (votesReceived > peers.size() / 2) {state = 2;System.out.println("Node " + selfId + " has been elected as the leader in term " + currentTerm);startSendingHeartbeats();}}}private void startSendingHeartbeats() {scheduler.scheduleAtFixedRate(() -> {AppendEntriesRequest request = new AppendEntriesRequest(currentTerm, selfId, getLastLogIndex(), getLastLogTerm(), new LogEntry[0], getCommitIndex());for (int peer : peers) {sendAppendEntriesRequest(peer, request);}}, 0, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);}private void sendAppendEntriesRequest(int peer, AppendEntriesRequest request) {AppendEntriesResponse response = handleAppendEntriesRequest(peer, request);handleAppendEntriesResponse(response);}private AppendEntriesResponse handleAppendEntriesRequest(int peer, AppendEntriesRequest request) {if (request.term < currentTerm) {return new AppendEntriesResponse(currentTerm, false);}if (request.term > currentTerm) {currentTerm = request.term;state = 0;votedFor = -1;}resetElectionTimeout();if (request.prevLogIndex >= 0 && (request.prevLogIndex >= log.size() || log.get((int) request.prevLogIndex).term != request.prevLogTerm)) {return new AppendEntriesResponse(currentTerm, false);}for (LogEntry entry : request.entries) {if (request.prevLogIndex + 1 + log.indexOf(entry) < log.size()) {log.set((int) (request.prevLogIndex + 1 + log.indexOf(entry)), entry);} else {log.add(entry);}}if (request.leaderCommit > getCommitIndex()) {}return new AppendEntriesResponse(currentTerm, true);}private void handleAppendEntriesResponse(AppendEntriesResponse response) {if (response.term > currentTerm) {currentTerm = response.term;state = 0;votedFor = -1;resetElectionTimeout();}}private long getLastLogIndex() {return log.size() - 1;}private long getLastLogTerm() {if (log.isEmpty()) {return 0;}return log.get(log.size() - 1).term;}private long getCommitIndex() {return getLastLogIndex();}@Overridepublic String toString() {return "DledgerNode{" +"currentTerm=" + currentTerm +", votedFor=" + votedFor +", log=" + log +", state=" + state +", selfId=" + selfId +", peers=" + peers +", scheduler=" + scheduler +", electionTimeout=" + electionTimeout +", votesReceived=" + votesReceived +'}';}
}
2. 定义日志条目类
class LogEntry {long term;String data;public LogEntry(long term, String data) {this.term = term;this.data = data;}
}
class VoteRequest {long term;int candidateId;long lastLogIndex;long lastLogTerm;public VoteRequest(long term, int candidateId, long lastLogIndex, long lastLogTerm) {this.term = term;this.candidateId = candidateId;this.lastLogIndex = lastLogIndex;this.lastLogTerm = lastLogTerm;}
}
class VoteResponse {long term;boolean voteGranted;public VoteResponse(long term, boolean voteGranted) {this.term = term;this.voteGranted = voteGranted;}
}
class AppendEntriesRequest {long term;int leaderId;long prevLogIndex;long prevLogTerm;LogEntry[] entries;long leaderCommit;public AppendEntriesRequest(long term, int leaderId, long prevLogIndex, long prevLogTerm, LogEntry[] entries, long leaderCommit) {this.term = term;this.leaderId = leaderId;this.prevLogIndex = prevLogIndex;this.prevLogTerm = prevLogTerm;this.entries = entries;this.leaderCommit = leaderCommit;}
}
class AppendEntriesResponse {long term;boolean success;public AppendEntriesResponse(long term, boolean success) {this.term = term;this.success = success;}
}
3. Dledger 的测试用例
import java.util.Arrays;
import java.util.List;public class DledgerTest {public static void main(String[] args) {List<Integer> peers = Arrays.asList(1, 2, 3);DledgerNode node1 = new DledgerNode(1, peers);
DledgerNode node2 = new DledgerNode(2, peers);
DledgerNode node3 = new DledgerNode(3, peers);
try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}
}