用java模拟raft算法
通过创建5个Node对象模拟一个集群中,节点是如何选举的
核心逻辑
randomPauseTime:随机1.x秒,用于每个node等待成为竞选者,随机数有利于避免同一时间出现多个竞选者
createVote:发起投票,如果等待的时间到期了,节点就会变成一个竞选者发起投票,如果获得的票数大于集群数量的一半,则自己当选为Leader
vote:用于给竞选者投票,只要当前节点不是竞选者,就同意给竞选者投票
sendPing:用于Leader给Follower发起心跳包
receivePing:Follower接收Leader的心跳包,并且重置自己的参选时间
start:用于模拟一整个时间线
Tips
这里没有模拟Term(任期),raft算法中,leader是有任期的
RaftDemo.java
package com.example.demoground.demo;
import cn.hutool.core.date.format.FastDateFormat;
import lombok.Data;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class RaftDemo {
private static Long randomPauseTime() {
return (long) (Math.random() * 1000 + 1000);
}
@Data
public static class Node {
private String name; // 节点名字
private String role; // 角色
private List<Node> nodeList; //互相感知的node
private volatile Node leader; // 主节点
private volatile boolean isCandidate = false; // 是否是参选
private volatile long candidateTime; // 参选时间
private volatile long sendPingTime; // leader节点下次发送ping时间
private volatile boolean online; // 是否在线
private volatile long offlineTime;
private volatile boolean isVoted; // 是否已经投票
public Node(String name, List<Node> nodeList) {
this.name = name;
this.role = "follower";
this.nodeList = nodeList;
this.online = true;
this.isCandidate = false;
this.candidateTime = System.currentTimeMillis() + randomPauseTime();
nodeList.add(this);
}
public String getName() {
return String.format("%s %s(%s)", FastDateFormat.getInstance("HH:mm:ss").format(new Date()), name, role);
}
public void receivePing(Node leader) {
candidateTime = System.currentTimeMillis() + randomPauseTime();
this.leader = leader;
this.role = "follower";
this.isCandidate = false;
this.isVoted = false;
}
public synchronized int vote(Node candidate) {
if (!this.online || this.isCandidate || this.isVoted) {
return 0;
}
System.out.printf("%s: 向%s投票%n", getName(), candidate.name);
this.candidateTime = System.currentTimeMillis() + randomPauseTime();
this.isVoted = true;
return 1;
}
/**
* 向follower节点发起ping信号
*/
public void sendPing() {
for (Node node : nodeList) {
if (node != this && node.isOnline()) {
node.receivePing(this);
System.out.printf("%s: 向%s发送ping信号%n", this.getName(), node.name);
}
}
this.sendPingTime = System.currentTimeMillis() + 1000;
}
/**
* 发起投票
*/
public void createVote() {
System.out.printf("%s: 发起投票%n", getName());
this.isCandidate = true;
int count = 1;
for (Node node : this.nodeList) {
if (node != this) {
count += node.vote(this);
}
}
System.out.printf("%s: 投票结果: %d%n", getName(), count);
if (count > nodeList.size() / 2) {
this.role = "leader";
this.leader = this;
this.isCandidate = false;
this.offlineTime = System.currentTimeMillis() + 3000;
System.out.printf("%s: 当选leader节点, 3s后下线%n", getName());
sendPing();
} else {
this.role = "follower";
this.leader = null;
this.isCandidate = false;
this.isVoted = false;
System.out.printf("%s: 未当选leader节点%n", getName());
this.candidateTime = System.currentTimeMillis() + randomPauseTime();
}
}
public void start() {
new Thread(() -> {
while (true) {
try {
// 内部心跳时间, 为了接收其他node发起的事件
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (!this.isOnline()) {
System.out.printf("%s: 节点已下线%n", getName());
break;
}
if (this.nodeList.stream().filter(Node::isOnline).count() < 3) {
System.out.printf("%s: 节点数小于3, 集群不可用%n", getName());
break;
}
// 如果当前节点是leader,则发送ping信号
if (this == this.leader) {
if (System.currentTimeMillis() >= this.offlineTime) {
this.online = false;
System.out.printf("%s: leader节点下线%n", getName());
break;
}
if (System.currentTimeMillis() >= this.sendPingTime) {
sendPing();
}
continue;
}
if (System.currentTimeMillis() < this.candidateTime) {
continue;
}
this.createVote();
}
}).start();
}
}
public static void main(String[] args) {
List<Node> nodeList = new ArrayList<>();
Node node1 = new Node("node1", nodeList);
Node node2 = new Node("node2", nodeList);
Node node3 = new Node("node3", nodeList);
Node node4 = new Node("node4", nodeList);
Node node5 = new Node("node5", nodeList);
node1.start();
node2.start();
node3.start();
node4.start();
node5.start();
}
}
输出日志
14:27:05 node2(follower): 发起投票
14:27:05 node4(follower): 发起投票
14:27:05 node1(follower): 向node2投票
14:27:05 node3(follower): 向node2投票
14:27:05 node5(follower): 向node2投票
14:27:05 node2(follower): 投票结果: 4
14:27:05 node4(follower): 投票结果: 1
14:27:05 node2(leader): 当选leader节点, 3s后下线
14:27:05 node4(follower): 未当选leader节点
14:27:05 node2(leader): 向node1发送ping信号
14:27:05 node2(leader): 向node3发送ping信号
14:27:05 node2(leader): 向node4发送ping信号
14:27:05 node2(leader): 向node5发送ping信号
14:27:06 node2(leader): 向node1发送ping信号
14:27:06 node2(leader): 向node3发送ping信号
14:27:06 node2(leader): 向node4发送ping信号
14:27:06 node2(leader): 向node5发送ping信号
14:27:07 node2(leader): 向node1发送ping信号
14:27:07 node2(leader): 向node3发送ping信号
14:27:07 node2(leader): 向node4发送ping信号
14:27:07 node2(leader): 向node5发送ping信号
14:27:08 node2(leader): leader节点下线
14:27:08 node3(follower): 发起投票
14:27:08 node1(follower): 向node3投票
14:27:08 node4(follower): 向node3投票
14:27:08 node5(follower): 向node3投票
14:27:08 node3(follower): 投票结果: 4
14:27:08 node3(leader): 当选leader节点, 3s后下线
14:27:08 node3(leader): 向node1发送ping信号
14:27:08 node3(leader): 向node4发送ping信号
14:27:08 node3(leader): 向node5发送ping信号
14:27:09 node3(leader): 向node1发送ping信号
14:27:09 node3(leader): 向node4发送ping信号
14:27:09 node3(leader): 向node5发送ping信号
14:27:10 node3(leader): 向node1发送ping信号
14:27:10 node3(leader): 向node4发送ping信号
14:27:10 node3(leader): 向node5发送ping信号
14:27:11 node3(leader): leader节点下线
14:27:11 node1(follower): 发起投票
14:27:12 node4(follower): 向node1投票
14:27:12 node5(follower): 向node1投票
14:27:12 node1(follower): 投票结果: 3
14:27:12 node1(leader): 当选leader节点, 3s后下线
14:27:12 node1(leader): 向node4发送ping信号
14:27:12 node1(leader): 向node5发送ping信号
14:27:13 node1(leader): 向node4发送ping信号
14:27:13 node1(leader): 向node5发送ping信号
14:27:14 node1(leader): 向node4发送ping信号
14:27:14 node1(leader): 向node5发送ping信号
14:27:15 node1(leader): leader节点下线
14:27:15 node4(follower): 节点数小于3, 集群不可用
14:27:15 node5(follower): 节点数小于3, 集群不可用