MelonBlog

用java模拟raft算法

通过创建5个Node对象模拟一个集群中,节点是如何选举的

核心逻辑

randomPauseTime:随机1.x秒,用于每个node等待成为竞选者,随机数有利于避免同一时间出现多个竞选者

createVote:发起投票,如果等待的时间到期了,节点就会变成一个竞选者发起投票,如果获得的票数大于集群数量的一半,则自己当选为Leader

vote:用于给竞选者投票,只要当前节点不是竞选者,就同意给竞选者投票

sendPing:用于Leader给Follower发起心跳包

receivePing:Follower接收Leader的心跳包,并且重置自己的参选时间

start:用于模拟一整个时间线


Tips

这里没有模拟Term(任期),raft算法中,leader是有任期的

RaftDemo.java

package com.example.demoground.demo;
import cn.hutool.core.date.format.FastDateFormat;
import lombok.Data;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class RaftDemo {
    private static Long randomPauseTime() {
        return (long) (Math.random() * 1000 + 1000);
    }
    @Data
    public static class Node {
        private String name; // 节点名字
        private String role; // 角色
        private List<Node> nodeList; //互相感知的node
        private volatile Node leader; // 主节点
        private volatile boolean isCandidate = false; // 是否是参选
        private volatile long candidateTime; // 参选时间
        private volatile long sendPingTime; // leader节点下次发送ping时间
        private volatile boolean online; // 是否在线
        private volatile long offlineTime;
        private volatile boolean isVoted; // 是否已经投票
        public Node(String name, List<Node> nodeList) {
            this.name = name;
            this.role = "follower";
            this.nodeList = nodeList;
            this.online = true;
            this.isCandidate = false;
            this.candidateTime = System.currentTimeMillis() + randomPauseTime();
            nodeList.add(this);
        }
        public String getName() {
            return String.format("%s %s(%s)", FastDateFormat.getInstance("HH:mm:ss").format(new Date()), name, role);
        }
        public void receivePing(Node leader) {
            candidateTime = System.currentTimeMillis() + randomPauseTime();
            this.leader = leader;
            this.role = "follower";
            this.isCandidate = false;
            this.isVoted = false;
        }
        public synchronized int vote(Node candidate) {
            if (!this.online || this.isCandidate || this.isVoted) {
                return 0;
            }
            System.out.printf("%s: 向%s投票%n", getName(), candidate.name);
            this.candidateTime = System.currentTimeMillis() + randomPauseTime();
            this.isVoted = true;
            return 1;
        }
        /**
         * 向follower节点发起ping信号
         */
        public void sendPing() {
            for (Node node : nodeList) {
                if (node != this && node.isOnline()) {
                    node.receivePing(this);
                    System.out.printf("%s: 向%s发送ping信号%n", this.getName(), node.name);
                }
            }
            this.sendPingTime = System.currentTimeMillis() + 1000;
        }
        /**
         * 发起投票
         */
        public void createVote() {
            System.out.printf("%s: 发起投票%n", getName());
            this.isCandidate = true;
            int count = 1;
            for (Node node : this.nodeList) {
                if (node != this) {
                    count += node.vote(this);
                }
            }
            System.out.printf("%s: 投票结果: %d%n", getName(), count);
            if (count > nodeList.size() / 2) {
                this.role = "leader";
                this.leader = this;
                this.isCandidate = false;
                this.offlineTime = System.currentTimeMillis() + 3000;
                System.out.printf("%s: 当选leader节点, 3s后下线%n", getName());
                sendPing();
            } else {
                this.role = "follower";
                this.leader = null;
                this.isCandidate = false;
                this.isVoted = false;
                System.out.printf("%s: 未当选leader节点%n", getName());
                this.candidateTime = System.currentTimeMillis() + randomPauseTime();
            }
        }
        public void start() {
            new Thread(() -> {
                while (true) {
                    try {
                        // 内部心跳时间, 为了接收其他node发起的事件
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (!this.isOnline()) {
                        System.out.printf("%s: 节点已下线%n", getName());
                        break;
                    }
                    if (this.nodeList.stream().filter(Node::isOnline).count() < 3) {
                        System.out.printf("%s: 节点数小于3, 集群不可用%n", getName());
                        break;
                    }
                    // 如果当前节点是leader,则发送ping信号
                    if (this == this.leader) {
                        if (System.currentTimeMillis() >= this.offlineTime) {
                            this.online = false;
                            System.out.printf("%s: leader节点下线%n", getName());
                            break;
                        }
                        if (System.currentTimeMillis() >= this.sendPingTime) {
                            sendPing();
                        }
                        continue;
                    }
                    if (System.currentTimeMillis() < this.candidateTime) {
                        continue;
                    }
                    this.createVote();
                }
            }).start();
        }
    }
    public static void main(String[] args) {
        List<Node> nodeList = new ArrayList<>();
        Node node1 = new Node("node1", nodeList);
        Node node2 = new Node("node2", nodeList);
        Node node3 = new Node("node3", nodeList);
        Node node4 = new Node("node4", nodeList);
        Node node5 = new Node("node5", nodeList);
        node1.start();
        node2.start();
        node3.start();
        node4.start();
        node5.start();
    }
}

输出日志

14:27:05 node2(follower): 发起投票
14:27:05 node4(follower): 发起投票
14:27:05 node1(follower): 向node2投票
14:27:05 node3(follower): 向node2投票
14:27:05 node5(follower): 向node2投票
14:27:05 node2(follower): 投票结果: 4
14:27:05 node4(follower): 投票结果: 1
14:27:05 node2(leader): 当选leader节点, 3s后下线
14:27:05 node4(follower): 未当选leader节点
14:27:05 node2(leader): 向node1发送ping信号
14:27:05 node2(leader): 向node3发送ping信号
14:27:05 node2(leader): 向node4发送ping信号
14:27:05 node2(leader): 向node5发送ping信号
14:27:06 node2(leader): 向node1发送ping信号
14:27:06 node2(leader): 向node3发送ping信号
14:27:06 node2(leader): 向node4发送ping信号
14:27:06 node2(leader): 向node5发送ping信号
14:27:07 node2(leader): 向node1发送ping信号
14:27:07 node2(leader): 向node3发送ping信号
14:27:07 node2(leader): 向node4发送ping信号
14:27:07 node2(leader): 向node5发送ping信号
14:27:08 node2(leader): leader节点下线
14:27:08 node3(follower): 发起投票
14:27:08 node1(follower): 向node3投票
14:27:08 node4(follower): 向node3投票
14:27:08 node5(follower): 向node3投票
14:27:08 node3(follower): 投票结果: 4
14:27:08 node3(leader): 当选leader节点, 3s后下线
14:27:08 node3(leader): 向node1发送ping信号
14:27:08 node3(leader): 向node4发送ping信号
14:27:08 node3(leader): 向node5发送ping信号
14:27:09 node3(leader): 向node1发送ping信号
14:27:09 node3(leader): 向node4发送ping信号
14:27:09 node3(leader): 向node5发送ping信号
14:27:10 node3(leader): 向node1发送ping信号
14:27:10 node3(leader): 向node4发送ping信号
14:27:10 node3(leader): 向node5发送ping信号
14:27:11 node3(leader): leader节点下线
14:27:11 node1(follower): 发起投票
14:27:12 node4(follower): 向node1投票
14:27:12 node5(follower): 向node1投票
14:27:12 node1(follower): 投票结果: 3
14:27:12 node1(leader): 当选leader节点, 3s后下线
14:27:12 node1(leader): 向node4发送ping信号
14:27:12 node1(leader): 向node5发送ping信号
14:27:13 node1(leader): 向node4发送ping信号
14:27:13 node1(leader): 向node5发送ping信号
14:27:14 node1(leader): 向node4发送ping信号
14:27:14 node1(leader): 向node5发送ping信号
14:27:15 node1(leader): leader节点下线
14:27:15 node4(follower): 节点数小于3, 集群不可用
14:27:15 node5(follower): 节点数小于3, 集群不可用