在部署应用时想要应用是高可用,通常会在应用前放置一个 HAProxy,当任何一个 Server 故障,HAProxy 会自动切换,但是 HAProxy 也存在单点故障,因此需要多个 HAProxy 来保证业务不中断,这时候我们需要另一个软件配合:Keepalived。通常我用 Keepalived 仅用来提供 VIP,保证当一个 Keepalived 故障,VIP 自动在其他 Keepalived 节点配置。
Keepalived 有一个问题是 virtual route ID 必须是同一网段内唯一的,当我们想要在一个网段内部署多个集群时,就需要人为的介入去分配 virtual route ID,不方便。这次来使用 Raft 自己实现 VIP 逻辑。
hashicorp/raft
Raft 有很多开源实现,其中 Hashicorp 实现的 Raft 库 已经被 Consul 等软件使用,且接口友善,选择使用它来实现。在 Github 上有很多 Raft 的使用示例,比较简单且完整的是 otoolep/hraftd,我们来看看他是怎么使用的。
otoolep/hraftd
main.go
在 main.go 中主要做了 4 件事情:store.New
, store.Open
, http.New
, http.Start
,先来看看程序是如何启动的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| func init() { // 设置命令行参数 flag.BoolVar(&inmem, "inmem", false, "Use in-memory storage for Raft") ... flag.Usage = func() { fmt.Fprintf(os.Stderr, "Usage: %s [options] <raft-data-path> \n", os.Args[0]) flag.PrintDefaults() } }
func main() { // 解析命令行参数 flag.Parse() ... // 创建一个 Store 对象 s := store.New(inmem) s.RaftDir = raftDir s.RaftBind = raftAddr // 运行 Store if err := s.Open(joinAddr == "", nodeID); err != nil { log.Fatalf("failed to open store: %s", err.Error()) } // 新建一个 http 对象并运行 h := httpd.New(httpAddr, s) if err := h.Start(); err != nil { log.Fatalf("failed to start HTTP service: %s", err.Error()) }
// 如果 joinAddr 参数不为空,则处理 join 请求 if joinAddr != "" { if err := join(joinAddr, raftAddr, nodeID); err != nil { log.Fatalf("failed to join node at %s: %s", joinAddr, err.Error()) } }
log.Println("hraftd started successfully") // 监听系统信号,若接收到 os.Interrupt 则程序退出 terminate := make(chan os.Signal, 1) signal.Notify(terminate, os.Interrupt) <-terminate log.Println("hraftd exiting") }
|
看了 main.go 我们知道调用了 http.Start
, 先不管 Store 是什么,先来看看 http 相关实现:
1 2 3 4 5 6 7 8
| // Start starts the service. func (s *Service) Start() error { // Service 实现了 ServeHTTP 方法 http.Handle("/", s) go func() { err := server.Serve(s.ln) ... }
|
1 2 3 4 5
| func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { if strings.HasPrefix(r.URL.Path, "/key") { s.handleKeyRequest(w, r) // 先忽略其他分支 }
|
主要处理请求的是 s.handleKeyRequest
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| func (s *Service) handleKeyRequest(w http.ResponseWriter, r *http.Request) { ... switch r.Method { case "GET": k := getKey() if k == "" { w.WriteHeader(http.StatusBadRequest) } v, err := s.store.Get(k) ... io.WriteString(w, string(b)) case "POST": // Read the value from the POST body. m := map[string]string{} if err := json.NewDecoder(r.Body).Decode(&m); err != nil { w.WriteHeader(http.StatusBadRequest) return } for k, v := range m { if err := s.store.Set(k, v); err != nil { w.WriteHeader(http.StatusInternalServerError) return } } ...
|
在 s.handleKeyRequest
中根据请求方法,去调用 store 对应的方法,那么 store 实现了哪些接口呢?这也是在 http 模块中定义的:
1 2 3 4 5 6
| type Store interface { Get(key string) (string, error) Set(key, value string) error Delete(key string) error Join(nodeID string, addr string) error }
|
除了 Join 看着比较奇怪,其他的都是一个 K/V 系统该有的接口,接下来就看看 Store 具体方法的实现。
store
这个模块的编写涉及到了 Raft 中的具体概念,建议先阅读 siddontang 写的 Raft 相关博客快速了解(链接在参考链接列出)。
以下以设置 Key 为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func (s *Store) Set(key, value string) error { if s.raft.State() != raft.Leader { return fmt.Errorf("not leader") }
// 封装具体执行的动作 c := &command{ Op: "set", Key: key, Value: value, } b, err := json.Marshal(c) ... // 将 command 应用于 FSM f := s.raft.Apply(b, raftTimeout) return f.Error() }
|
查看 FSM Apply 方法实现:
1 2 3 4 5 6 7 8 9 10
| // Apply applies a Raft log entry to the key-value store. func (f *fsm) Apply(l *raft.Log) interface{} { var c command // 根据操作动作的不同,执行不同的方法,这里以设置 Key 为例 switch c.Op { case "set": return f.applySet(c.Key, c.Value) ... } }
|
1 2 3 4 5 6 7 8
| func (f *fsm) applySet(key, value string) interface{} { // 互斥锁 f.mu.Lock() defer f.mu.Unlock() // 设置 Map 中的 Key/Value f.m[key] = value return nil }
|
看了上面的具体动作实现,接下来看看 Raft 具体启动:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| func (s *Store) Open(enableSingle bool, localID string) error { // 设置 Raft 配置 config := raft.DefaultConfig() config.LocalID = raft.ServerID(localID) // 设置 Raft 通信 addr, err := net.ResolveTCPAddr("tcp", s.RaftBind) ... transport, err := raft.NewTCPTransport(s.RaftBind, addr, 3, 10*time.Second, os.Stderr) // 设置 Raft 存储对象 snapshots, err := raft.NewFileSnapshotStore(s.RaftDir, retainSnapshotCount, os.Stderr) var logStore raft.LogStore var stableStore raft.StableStore if s.inmem { logStore = raft.NewInmemStore() stableStore = raft.NewInmemStore() } else { boltDB, err := raftboltdb.NewBoltStore(filepath.Join(s.RaftDir, "raft.db")) if err != nil { return fmt.Errorf("new bolt store: %s", err) } logStore = boltDB stableStore = boltDB } // 创建 raft 示例,并使用该 raft 实例启动集群 ra, err := raft.NewRaft(config, (*fsm)(s), logStore, stableStore, snapshots, transport) if err != nil { return fmt.Errorf("new raft: %s", err) } s.raft = ra if enableSingle { ... ra.BootstrapCluster(configuration) } }
|
这里可以看到 Raft 所需接口为 FSM 和 Snapshot,具体的实现方式根据需求来实现,一般与 hraftd 相仿,大概了解了 hashicorp/raft 的使用,那么我们来实现具体的 VIP 功能。
network
既然是跟 IP 相关,那么肯定需要给对应网卡配置时间,在 Linux 中我们可以通过 ip
命令来设置,Golang 中使用 vishvananda/netlink 来实现。
netlink.AddrAdd
可以在指定的网络设备上添加 IP 地址。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (nc *NetworkConfig) addIP() error { res, err := nc.IsSet() if err != nil { return errors.Wrap(err, "ip check in AddIP failed")
} if res { return nil } if err := netlink.AddrAdd(nc.link, nc.address); err != nil { return errors.Wrap(err, "could not add ip") } return nil }
|
netlink.AddrDel
可以将 IP 从指定网络设备上删除:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (nc *NetworkConfig) delIP() error { res, err := nc.IsSet() if err != nil { return errors.Wrap(err, "ip check in DelIP failed") }
if !res { return nil }
if err := netlink.AddrDel(nc.link, nc.address); err != nil { return errors.Wrap(err, "could not delete ip") }
return nil }
|
实现了 IP 设置相关,我们不需要提供 HTTP 接口,直接编写 Raft 相关实现,跟 hraftd 实现不同,在 hraftd 中需要进行信息写入读取,而我们的 VIP 仅依赖于 Raft 选举 Leader,所以只需要编写好对应的方法,不需要做额外操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| type FSM struct { }
func (fsm FSM) Apply(log *raft.Log) interface{} { return nil }
func (fsm FSM) Restore(snap io.ReadCloser) error { return nil }
func (fsm FSM) Snapshot() (raft.FSMSnapshot, error) { return Snapshot{}, nil }
type Snapshot struct { }
func (snapshot Snapshot) Persist(sink raft.SnapshotSink) error { return nil }
func (snapshot Snapshot) Release() { }
|
serve
基础方法都已经实现了,那么接下来编写集群启动逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| func (manager *VIPManager) Start() error { // 初始化 raft 配置、存储对象、通信 for id, ip := range manager.peers { configuration.Servers = append(configuration.Servers, raft.Server{ID: raft.ServerID(id), Address: raft.ServerAddress(ip)}) }
// 启动 Raft 集群,这里与 hraftd 不同,需要注意 if error := raft.BootstrapCluster(config, logStore, stableStore, snapshots, transport, configuration); error != nil { return error }
// 创建 raft 实例 raftServer, error := raft.NewRaft(config, manager.fsm, logStore, stableStore, snapshots, transport) ... ticker := time.NewTicker(time.Second) isLeader := false // 服务启动时先删除 VIP,防止集群中同时存在节点都配置了 VIP manager.deleteIP(false) go func() { for { select { // 如果 当前节点是 Leader 节点,则设置 VIP case leader := <-raftServer.LeaderCh(): if leader { isLeader = true log.Info("Leading") manager.addIP(true)
} // 定时检测,如果是 Leader,则检测 VIP 是否正确设置,如果没有就再次配置 VIP case <-ticker.C: if isLeader { result, error := manager.networkConfigurator.IsSet() if error != nil { log.WithFields(log.Fields{"error": error, "ip": manager.networkConfigurator.IP(), "interface": manager.networkConfigurator.Interface()}).Error("Could not check ip") }
if result == false { log.Error("Lost IP")
manager.addIP(true) } } ... } } }() }
|
这里与 hraftd 的实现不同,hraftd 先实例 raft,然后使用该 raft 实例启动集群,这样做的好处是哪怕集群只有一个节点,就是第一个节点,那么集群也是可以正常工作的,坏处是集群启动顺序是固定的,必须要先启动第一个节点,然后其他节点通过 Join 请求添加到 Raft 集群中(我们忽略了 Join 的走读)。
重新想一下我们的需求:集群、高可用、故障。当这几个词放在一起,我们就知道 hraftd 的方法不适合我们,有以下原因:
- 集群节点启动顺序要求
- 各个节点配置文件不同,有的需要 Join 参数
所以我们是直接使用 raft.BootstrapCluster
来启动集群,虽然只有一个节点集群无法正常工作,但是这个是可以容忍的。
在实现完上述功能后,我以为大功告成了,就开始自测,但是测试过程中发现了很诡异的现象,虽然我们通过 Raft 自身选举实现了 VIP 的故障自动漂移,但是实际测试中发现业务访问随着 VIP 的重建并没有立即恢复,,检查 ARP 记录发现集群中各个节点关于 VIP 的 ARP 记录各不相同,甚至是完全不同。
我们来重温下 ARP 协议内容:
地址解析协议(英語:Address Resolution Protocol,缩写:ARP)是一个通过解析网络层地址来找寻数据链路层地址的网络传输协议,它在IPv4中极其重要。ARP最初在1982年的RFC 826(征求意见稿)[1]中提出并纳入互联网标准 STD 37. ARP 也可能指是在多数操作系统中管理其相关地址的一个进程。
在以太网协议中规定,同一局域网中的一台主机要和另一台主机进行直接通信,必须要知道目标主机的MAC地址。而在TCP/IP协议中,网络层和传输层只关心目标主机的IP地址。这就导致在以太网中使用IP协议时,数据链路层的以太网协议接到上层IP协议提供的数据中,只包含目的主机的IP地址。于是需要一种方法,根据目的主机的IP地址,获得其MAC地址。这就是ARP协议要做的事情。所谓地址解析(address resolution)就是主机在发送帧前将目标IP地址转换成目标MAC地址的过程。
如果 VIP 与真实节点对应的 MAC 地址不同,就相当于 ARP 攻击了,所以我们的 Leader 节点设置完 VIP 后,还需要发送 ARP 请求广播,告诉广播域中的其他节点 VIP 正确的 MAC 地址。采用的方式是 gratuitous ARP(免费 ARP)。这里我们直接找一个开源的 ARP 实现来完成这个需求:
google/seesaw 是一个负载均衡器,里面实现了这个功能:
1 2 3 4 5 6 7 8 9 10 11 12 13
| // ARPSendGratuitous sends a gratuitous ARP message via the specified interface. func (ncc *SeesawNCC) ARPSendGratuitous(arp *ncctypes.ARPGratuitous, out *int) error { iface, err := net.InterfaceByName(arp.IfaceName) if err != nil { return fmt.Errorf("failed to get interface %q: %v", arp.IfaceName, err) } log.V(2).Infof("Sending gratuitous ARP for %s (%s) via %s", arp.IP, iface.HardwareAddr, iface.Name) m, err := gratuitousARPReply(arp.IP, iface.HardwareAddr) if err != nil { return err } return sendARP(iface, m) }
|
使用 hashicorp/raft 可以很简单方便的实现需要选举 Leader 的分布式应用,虽然我司的 Slogan 是 Make IT Simple
,但是愈发感觉 Hashicorp 才是这句话的忠实体现,他们的 Terraform、Vault、Consul、Nomad、Vagrant 等软件,都是让基础设施的适用与管理更简单方便的,还是很爽的。
本文的具体实现在 sparrow 可以看到。